class="hide-preCode-box">

- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
Order.API/MessageDto/CreateOrderMessageDto.cs:
public class CreateOrderMessageDto
{
public int ProductID { get; set; }
public int Count { get; set; }
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
Order.API/Models/Order.cs订单实体类:
public class Order
{
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
public int ID { get; set; }
[Required]
public DateTime CreateTime { get; set; }
[Required]
public int ProductID { get; set; }
[Required]
public int Count { get; set; }
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
Order.API/Models/OrderContext.cs数据库Context:
public class OrderContext : DbContext
{
public OrderContext(DbContextOptions<OrderContext> options)
: base(options)
{
}
public DbSet<Order> Orders { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
Order.API/appsettings.json增加数据库连接字符串:
"ConnectionStrings": {
"OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));
services.AddCap(x =>
{
x.UseEntityFramework<OrderContext>();
x.UseRabbitMQ("host.docker.internal");
});
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">

以上是订单服务的修改。
Product.API/Controllers/ProductsController.cs增加减库存接口:
[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{
private readonly ILogger<ProductsController> _logger;
private readonly IConfiguration _configuration;
private readonly ICapPublisher _capBus;
private readonly ProductContext _context;
public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context)
{
_logger = logger;
_configuration = configuration;
_capBus = capPublisher;
_context = context;
}
[HttpGet]
public IActionResult Get()
{
string result = $"【产品服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +
$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";
return Ok(result);
}
[NonAction]
[CapSubscribe("order.services.createorder")]
public async Task ReduceStock(CreateOrderMessageDto message)
{
var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);
product.Stock -= message.Count;
await _context.SaveChangesAsync();
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
Product.API/MessageDto/CreateOrderMessageDto.cs:
public class CreateOrderMessageDto
{
public int ProductID { get; set; }
public int Count { get; set; }
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
Product.API/Models/Product.cs产品实体类:
public class Product
{
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
public int ID { get; set; }
[Required]
[Column(TypeName = "VARCHAR(16)")]
public string Name { get; set; }
[Required]
public int Stock { get; set; }
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
Product.API/Models/ProductContext.cs数据库Context:
public class ProductContext : DbContext
{
public ProductContext(DbContextOptions<ProductContext> options)
: base(options)
{
}
public DbSet<Product> Products { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.Entity<Product>().HasData(new Product
{
ID = 1,
Name = "产品1",
Stock = 100
},
new Product
{
ID = 2,
Name = "产品2",
Stock = 100
});
}
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
Product.API/appsettings.json增加数据库连接字符串:
"ConnectionStrings": {
"ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:\
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));
services.AddCap(x =>
{
x.UseEntityFramework<ProductContext>();
x.UseRabbitMQ("host.docker.internal");
});
}
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">

以上是产品服务的修改。
订单服务和产品服务的修改到此就完成了,看着修改很多,其实功能很简单。就是各自增加了自己的数据库表,然后订单服务增加了下单接口,下单接口会发出 “下单事件”
。产品服务增加了减库存接口,减库存接口会订阅 “下单事件”
。然后客户端调用下单接口下单时,产品服务会减去相应的库存,功能就这么简单。
关于 EF数据库迁移
之类的基本使用就不介绍了。使用 Docker
重新构建镜像,运行订单服务,产品服务:
docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"
docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
最后 Ocelot.APIGateway/ocelot.json 增加一条路由配置:

好了,进行到这里,整个环境就有点复杂了。确保我们的PostgreSQL,RabbitMQ,Consul,Gateway,服务实例都正常运行。
服务实例运行成功后,数据库应该是这样的:




📃产品表种子数据:

cap.published
表和 cap.received
表是由 CAP
自动生成的,它内部是使用本地消息表+MQ来实现异步确保。
三、运行测试
这次使用 Postman
作为客户端调用下单接口( 9070
是之前的 Ocelot
网关端口):

订单库 published
表:

订单库 order
表:

产品库 received
表:

产品库 product
表:

再试一下:


OK,完成。虽然功能很简单,但是我们实现了服务的解耦,异步调用,和最终一致性。
四、总结
注意,上面的例子纯粹是为了说明 EventBus
的使用,实际中的下单流程绝对不会这么做的!希望大家不要较真。
可能有人会说如果下单成功,但是库存不足导致减库存失败了怎么办,是不是要回滚订单表的数据?如果产生这种想法,说明还没有真正理解最终一致性的思想。
首先下单前肯定会检查一下库存数量,既然允许下单那么必然是库存充足的。这里的事务是指:订单保存到数据库,和下单事件保存到 cap.published
表(保存到 cap.published
表理论上就能够发送到MQ)这两件事情,要么一同成功,要么一同失败。如果这个事务成功,那么就可以认为这个业务流程是成功的,至于产品服务的减库存是否成功那就是产品服务的事情了(理论上也应该是成功的,因为消息已经确保发到了MQ,产品服务必然会收到消息),CAP也提供了失败重试,和失败回调机制。
如果非要数据回滚也是能实现的,CAP
的 ICapPublisher.Publish
方法提供一个 callbackName
参数,当减库存时,可以触发这个回调。其本质也是通过发布订阅完成,这是不推荐的做法,就不详细说了,有兴趣自己研究一下。
另外,CAP
无法保证消息不重复,实际使用中需要自己考虑一下消息的重复过滤和幂等性。

data-report-view="{"mod":"1585297308_001","spm":"1001.2101.3001.6548","dest":"http://iyenn.com/rec/1636892.html","extend1":"pc","ab":"new"}">>
评论记录:
回复评论: