首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Spring Cloud Bus

  • 25-03-02 14:21
  • 2433
  • 12262
blog.csdn.net

发博词

Spring生态消息中间件相关几个项目如下spring messaging,spring integration,spring cloud stream,spring cloud bus,项目是从前到后一次构建,后面的依赖前面的项目。本文重点介绍下spring cloud bus。

Spring Cloud Bus将Spring的事件处理机制和消息中间件消息的发送和接收整合起来,可以轻松的将分布式应用中连接有消息中间件的多个服务节点连接起来,实现消息互通。

原理

看两个例子就明白了。

spring cloud bus提供了两个endpoint,一个是/bus/refresh,一个是/bus/env。

@RequestMapping(value = "env", method = RequestMethod.POST)
@ResponseBody
@ManagedOperation
// TODO: make this an abstract method in AbstractBusEndpoint?
public void env(@RequestParam Map params,
		@RequestParam(value = "destination", required = false) String destination) {
	publish(new EnvironmentChangeRemoteApplicationEvent(this, getInstanceId(),
			destination, params));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
@RequestMapping(value = "refresh", method = RequestMethod.POST)
	@ResponseBody
	@ManagedOperation
	public void refresh(
			@RequestParam(value = "destination", required = false) String destination) {
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

消息首先通过Spring 的事件机制,发送到context中,然后看下面
BusAutoConfiguration#acceptLocal

@EventListener(classes = RemoteApplicationEvent.class)
	public void acceptLocal(RemoteApplicationEvent event) {
		if (this.serviceMatcher.isFromSelf(event)
				&& !(event instanceof AckRemoteApplicationEvent)) {
			this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

事件消息在这里接收,然后发送到消息中间件。
BusAutoConfiguration#acceptRemote

@StreamListener(SpringCloudBusClient.INPUT)
	public void acceptRemote(RemoteApplicationEvent event) {
		if (event instanceof AckRemoteApplicationEvent) {
			if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
					&& this.applicationEventPublisher != null) {
				this.applicationEventPublisher.publishEvent(event);
			}
			// If it's an ACK we are finished processing at this point
			return;
		}
		if (this.serviceMatcher.isForSelf(event)
				&& this.applicationEventPublisher != null) {
			if (!this.serviceMatcher.isFromSelf(event)) {
				this.applicationEventPublisher.publishEvent(event);
			}
			if (this.bus.getAck().isEnabled()) {
				AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
						this.serviceMatcher.getServiceId(),
						this.bus.getAck().getDestinationService(),
						event.getDestinationService(), event.getId(), event.getClass());
				this.cloudBusOutboundChannel
						.send(MessageBuilder.withPayload(ack).build());
				this.applicationEventPublisher.publishEvent(ack);
			}
		}
		if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
			// We are set to register sent events so publish it for local consumption,
			// irrespective of the origin
			this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
					event.getOriginService(), event.getDestinationService(),
					event.getId(), event.getClass()));
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

接收到消息之后,根据消息的来源,目的地(destination)配置等信息,将消息中间件过来的数据转化为RemoteApplicationEvent对象,再次发布到spring context中。
最终,事件消息被EnvironmentChangeListener和RefreshListener处理。注意这两个都是Spring ApplicationListener类的子类,最后一个章节里面自定义RemoteApplicationEvent的时候,我们最终处理这个事件的时候,就是实现一个对应事件的ApplicationListener,比如

public class EnvironmentChangeListener implements ApplicationListener
  • 1

Tracing Bus Events

使用spring.cloud.bus.trace.enabled=true开启trace之后,可以追踪所有此节点的RemoteApplicationEvent的各个子事件类型事件。
看个例子:

{
  "timestamp": "2015-11-26T10:24:44.411+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "stores:8081",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.864+0000",
  "info": {
    "signal": "spring.cloud.bus.sent",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.862+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

上面的trace信息展示了customers:9000发送RefreshRemoteApplicationEvent 给所有的服务, 这个时间被customers:9000 和 stores:8081接收到,customers:9000是他自己。

自定义RemoteApplicationEvent

注册事件

提供了@RemoteApplicationEventScan注解可以非常方便的注册一个自定义的RemoteApplicationEvent。
自定义一个事件类型,然后使用@RemoteApplicationEventScan注解让Spring 启动的时候可以扫描到这个事件类:

package com.acme;

public class FooEvent extends RemoteApplicationEvent {
    ...
}

package com.acme;

@Configuration
@RemoteApplicationEventScan
public class BusConfiguration {
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

扫描到事件类然后注册到Spring IOC容器中的逻辑是RemoteApplicationEventRegistrar实现的,这是一个ImportBeanDefinitionRegistrar的实现类。这个类是专门用来在处理@Configuration注解的类的时候,在Spring IOC容器中注册其他的Bean用的。

通过上边的步骤之后,

发送事件

将ApplicationEventPublisher注入到业务逻辑类中,在业务逻辑里,使用ApplicationEventPublisher#publishEvent方法,将事件发送到context中,剩下的工作,将事件消息发送到消息中间件,接收消息中间件发送过来的事件消息,将接收到的事件消息发送到本地的context中,这三部工具就由spring cloud bus自动完成了。

发送事件可以参考EnvironmentBusEndpoint和RefreshBusEndpoint的实现。

接收处理事件

我们只需要实现一个ApplicationListener的事件监听器,等着处理事件就行了。
接收事件可以参考EnvironmentChangeListener和RefreshListener的实现。

Spring Cloud Bus 内置的两个接口bus/env和 bus/refresh可以认为是actuator的env 和refresh的分布式版本
注意消息中间件的特性,比如Kafka,同一个groupid的客户端只能有一个收到信息,那就要每一个Instance都是一个不同的groupid,即时是同一个微服务的不同Instance

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览148947 人正在系统学习中
注:本文转载自blog.csdn.net的陈振阳的文章"https://blog.csdn.net/xichenguan/article/details/77535694"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top