Eureka(AP) Eureka单机/集群配置 eureka的服务端加注解==@EnableEurekaServer==
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-server</artifactId > </dependency >
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
网站服务端单机/集群配置 1.当网站的当服务提供者为单机版时,网站的客户端controller中的服务端url可以写死,
actuator微服务信息完善 在网站服务端集群的每个配置文件中,在eureka下都要加上instance配置
1 2 3 4 eureka: instance: instance-id: payment8001 prefer-ip-address: true
服务发现Discovery 在controller中注入DiscoveryClient变量,写discovery方法,并在主启动类上加注解==@EnableDiscoveryClient==
关闭自我保护 在eureka服务端配置文件中加入:
1 2 3 4 server: enable-self-preservation: false eviction-interval-timer-in-ms: 2000
1 2 3 4 lease-renewal-interval-in-seconds: 1 lease-expiration-duration-in-seconds: 2
配置文件集合 eureka服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 server: port: 7001 eureka: instance: hostname: eureka7001.com client: register-with-eureka: false fetch-registry: false service-url: defaultZone: http://eureka7002.com:7002/eureka/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 server: port: 8001 spring: application: name: cloud-payment-service datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/db_test?useUnicode=true&characterEncoding=utf-8&userSSL-false username: root password: xxxxxx mybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: com.spongehah.springcloud.entities eureka: client: register-with-eureka: true fetchRegistry: true service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka instance: instance-id: payment8001 prefer-ip-address: true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 server: port: 8080 spring: application: name: cloud-order-service eureka: client: register-with-eureka: true fetchRegistry: true service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
Zookeeper(CP) 关闭linux防火墙后,启动zookeeper服务器
1 2 3 4 5 6 cd /opt/module/zookeeper-3.5.7/bin ./zkServer.sh start #启动zk服务器 ./zkCli.sh #启动zk客户端 ./zkServer.sh stop ./zkServer.sh status quit #退出客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-zookeeper-discovery</artifactId > </dependency > <dependency > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > <version > 3.5.7</version > </dependency >
1 2 3 4 5 6 7 8 9 10 server: port: 8004 /80(被占用用8080) spring: application: name: cloud-provider-payment/cloud-consumer-order cloud: zookeeper: connect-string: 192.168 .111 .100 :2181
主启动类 加上==@EnableDiscoveryClient==注解
controller: 和eureka中的网站服务端客户端controller类似,网站客户端记得给restTemplate加上==@LoadBalanced==
Consul(CP) windows启动consul:
通过网址 http://localhost:8500 访问consul界面
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-consul-discovery</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # server: port: 8006/8080 spring: application: name: consul-provider-payment/cloud-consumer-order # cloud: consul: host: localhost port: 8500 discovery: #hostname: service-name: ${spring.application.name}
主启动类 加上==@EnableDiscoveryClient==注解
controller: 和eureka中的网站服务端客户端controller类似,网站客户端记得给restTemplate加上==@LoadBalanced==
服务调用1 ==服务调用:选取服务提供者集群中的具体某一个实例==(从8001、8002中选择一个处理客户端的请求)
Ribbon ==restTemplate实现调用服务端,@LoadBalance实现负载均衡==
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-ribbon</artifactId > </dependency >
restTemplate的使用 ==@LoadBalanced==
1 2 3 4 5 6 7 8 9 @Configuration public class ApplicationContextConfig { @Bean public RestTemplate getRestTemplate () { return new RestTemplate (); } }
public static final String PAYMENT_URL = “http://CLOUD-PAYMENT-SERVICE “;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @RestController @Slf4j public class OrderController { public static final String PAYMENT_URL = "http://CLOUD-PAYMENT-SERVICE" ; @Resource private RestTemplate restTemplate; @GetMapping("/consumer/payment/create") public CommonResult<Payment> create (Payment payment) { return restTemplate.postForObject(PAYMENT_URL + "/payment/create" ,payment,CommonResult.class); } }
1 2 3 4 5 6 7 8 9 10 @Configuration public class MySelfRule { @Bean public IRule myRule () { return new RandomRule (); } }
1 2 3 4 5 6 7 8 9 10 @SpringBootApplication @EnableEurekaClient @RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration=MySelfRule.class) public class OrderMain80 { public static void main (String[] args) { SpringApplication.run(OrderMain80.class,args); } }
负载均衡算法原理 轮询算法原理:
1 2 3 4 5 6 7 @GetMapping(value = "/payment/lb") public String getPaymentLB () { return serverPort; }
1 2 3 4 5 public interface LoadBalancer { ServiceInstance instances (List<ServiceInstance> serviceInstances) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Component public class MyLB implements LoadBalancer { private AtomicInteger atomicInteger = new AtomicInteger (0 ); public final int getAndIncrement () { int current; int next; do { current = this .atomicInteger.get(); next = current >= 2147483647 ? 0 : current + 1 ; } while (!this .atomicInteger.compareAndSet(current, next)); System.out.println("*****next: " +next); return next; } @Override public ServiceInstance instances (List<ServiceInstance> serviceInstances) { int index = getAndIncrement() % serviceInstances.size(); return serviceInstances.get(index); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Resource private LoadBalancer loadBalancer; @Resource private DiscoveryClient discoveryClient; @GetMapping("/consumer/payment/lb") public String getPaymentLB () { List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE" ); if (instances == null || instances.size() <= 0 ){ return null ; } ServiceInstance serviceInstance = loadBalancer.instances(instances); URI uri = serviceInstance.getUri(); return restTemplate.getForObject(uri + "/payment/lb" ,String.class); }
服务调用2 OpenFeign(集成Ribbon,简化开发) Feign是一个声明式WebService客户端。使用Feign能让编写Web Service客户端更加简单。
它的使用方法是定义一个服务接口然后在上面添加注解 。Feign也支持可拔插式的编码器和解码器。Spring Cloud
对Feign进行了封装,使其支持了Spring MVC标准注解和HttpMessageConverters。Feign可以与Eureka和Ribbon组合使用以支持负载均衡
==不再使用restTemplate,使用@EnableFejgnClients + @FeignClient(value = “服务注册名”)实现调用服务端==
OpenFeign的使用:(客户端) 1、新建module,该module是客户端,不再使用原客户端(原客户端是使用的restTemplate)
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 server: port: 80 eureka: client: register-with-eureka: false //这里没有把自己注册到eureka服务中心 service-url: defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
1 2 3 4 5 6 7 8 9 @SpringBootApplication @EnableFeignClients public class OrderFeignMain80 { public static void main (String[] args) { SpringApplication.run(OrderFeignMain80.class,args); } }
1 2 3 4 5 6 7 @Component @FeignClient(value = "CLOUD-PAYMENT-SERVICE") public interface PaymentFeignService { @GetMapping("/payment/get/{id}") public CommonResult<Payment> getPaymentById (@PathVariable("id") Long id) ; }
1 2 3 4 5 6 7 8 9 10 11 12 @RestController @Slf4j public class OrderFeignController { @Resource private PaymentFeignService paymentFeignService; @GetMapping("/consumer/payment/get/{id}") public CommonResult<Payment> getPaymentById (@PathVariable("id") Long id) { return paymentFeignService.getPaymentById(id); } }
超时控制 ==默认Feign客户端只等待一秒钟==,但是服务端处理需要超过1秒钟,导致Feign客户端不想等待了,直接返回报错。
1 2 3 4 5 6 7 8 @GetMapping(value = "/payment/feign/timeout") public String paymentFeignTimeOut () { System.out.println("*****paymentFeignTimeOut from port: " +serverPort); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } return serverPort; }
1 2 @GetMapping(value = "/payment/feign/timeout") public String paymentFeignTimeOut () ;
1 2 3 4 @GetMapping("/consumer/payment/feign/timeout") public String paymentFeignTimeOut () { return paymentFeignService.paymentFeignTimeOut(); }
测试: 测试错误:Read timed out executing GET http://CLOUD-PAYMENT-SERVICE/payment/feign/timeout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 feign: client: config: default: read-timeout: 5000 connect-timeout: 5000
OpenFeign日志打印功能 Feign 提供了日志打印功能,我们可以通过配置来调整日志级别,从而了解 Feign 中 Http 请求的细节。
HEADERS:除了 BASIC 中定义的信息之外,还有请求和响应的头信息;
FULL:除了 HEADERS 中定义的信息之外,还有请求和响应的正文及元数据。
1 2 3 4 5 6 7 8 9 @Configuration public class FeignConfig { @Bean Logger.Level feignLoggerLevel () { return Logger.Level.FULL; } }
1 2 3 4 logging: level: com.spongehah.springcloud.service.PaymentFeignService: debug
服务降级、熔断、限流 分布式系统面临的问题
服务雪崩 多个微服务之间调用的时候,假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其它的微服务,这就是所谓的“扇出” 。如果扇出的链路上某个微服务的调用响应时间过长或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,所谓的“雪崩效应”.
对于高流量的应用来说,单一的后端依赖可能会导致所有服务器上的所有资源都在几秒钟内饱和。比失败更糟糕的是,这些应用程序还可能导致服务之间的延迟增加,备份队列,线程和其他系统资源紧张,导致整个系统发生更多的级联故障。这些都表示需要对故障和延迟进行隔离和管理,以便单个依赖关系的失败,不能取消整个应用程序或系统。 所以, 通常当你发现一个模块下的某个实例失败后,这时候这个模块依然还会接收流量,然后这个有问题的模块还调用了其他的模块,这样就会发生级联故障,或者叫雪崩。
Hystrix Hystrix是一个用于处理分布式系统的延迟 和容错 的开源库,在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等,Hystrix能够保证在一个依赖出问题的情况下,不会导致整体服务失败,避免级联故障,以提高分布式系统的弹性。
“断路器”本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常 ,这样就保证了服务调用方的线程不会被长时间、不必要地占用 ,从而避免了故障在分布式系统中的蔓延,乃至雪崩。
Hystrix新建案例模块 1、新建cloud-provider-hystrix-payment8001
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-hystrix</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 server: port: 8001 spring: application: name: cloud-provider-hystrix-payment eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: http://eureka7001.com:7001/eureka
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableEurekaClient public class PaymentHystrixMain8001 { public static void main (String[] args) { SpringApplication.run(PaymentHystrixMain8001.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Service public class PaymentServiceImpl implements PaymentService { @Override public String paymentInfo_OK (Integer id) { return "线程池:" +Thread.currentThread().getName()+"paymentInfo_OK,id: " +id+"\t" +"O(∩_∩)O" ; } @Override public String paymentInfo_TimeOut (Integer id) { int timeout = 3 ; try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } return "线程池:" +Thread.currentThread().getName()+"paymentInfo_TimeOut,id: " +id+"\t" +"O(∩_∩)O,耗费(秒) " + timeout; } } @RestController @Slf4j public class PaymentController { @Resource private PaymentService paymentService; @Value("${server.port}") private String serverPort; @GetMapping("/payment/hystrix/ok/{id}") public String paymentInfo_OK (@PathVariable("id") Integer id) { String result = paymentService.paymentInfo_OK(id); log.info("****result: " +result); return result; } @GetMapping("/payment/hystrix/timeout/{id}") public String paymentInfo_TimeOut (@PathVariable("id") Integer id) throws InterruptedException { String result = paymentService.paymentInfo_TimeOut(id); log.info("****result: " +result); return result; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-hystrix</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 server: port: 8080 eureka: client: register-with-eureka: false service-url: defaultZone: http://eureka7001.com:7001/eureka/
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableFeignClients public class OrderHystrixMain8080 { public static void main (String[] args) { SpringApplication.run(OrderHystrixMain8080.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Component @FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT") public interface PaymentHystrixService { @GetMapping("/payment/hystrix/ok/{id}") public String paymentInfo_OK (@PathVariable("id") Integer id) ; @GetMapping("/payment/hystrix/timeout/{id}") public String paymentInfo_TimeOut (@PathVariable("id") Integer id) ; } @RestController @Slf4j public class OrderHystrixController { @Resource private PaymentHystrixService paymentHystrixService; @GetMapping("/consumer/payment/hystrix/ok/{id}") public String paymentInfo_OK (@PathVariable("id") Integer id) { String result = paymentHystrixService.paymentInfo_OK(id); return result; } @GetMapping("/consumer/payment/hystrix/timeout/{id}") public String paymentInfo_TimeOut (@PathVariable("id") Integer id) { String result = paymentHystrixService.paymentInfo_TimeOut(id); return result; } }
服务端服务降级 给服务端8001主启动类添加注解==@EnableCircuitBreaker==
给服务端8001 service层需要服务降级的方法添加注解==@HystrixCommand==
1 2 3 4 5 6 7 8 9 @SpringBootApplication @EnableEurekaClient @EnableCircuitBreaker public class PaymentHystrixMain8001 { public static void main (String[] args) { SpringApplication.run(PaymentHystrixMain8001.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @HystrixCommand(fallbackMethod = "paymentInfo_TimeOutHandler",commandProperties = { //指定服务降级fallback方法 @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value="3000") //指定超时上限(ms) }) @Override public String paymentInfo_TimeOut (Integer id) { int timeout = 3 ; try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } return "线程池:" +Thread.currentThread().getName()+"paymentInfo_TimeOut,id: " +id+"\t" +"O(∩_∩)O,耗费(秒) " + timeout; } public String paymentInfo_TimeOutHandler (Integer id) { return "/(ㄒoㄒ)/8001调用支付接口超时或异常:\t" + "\t当前线程池名字" + Thread.currentThread().getName(); }
客户端降级(降级一般是客户端) 改yaml:==坑!!这里记得配置feign的超时控制时间,避免覆盖fallback方法设定的时间==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 server: port: 8080 eureka: client: register-with-eureka: false service-url: defaultZone: http://eureka7001.com:7001/eureka/ #设置Feign的超时控制,避免默认时长覆盖@HystrixProperty注解的设定时长 feign: client: config: default: connect-timeout: 5000 read-timeout: 5000 # #开启@FeignClient注解的Hystrix功能,能指定fallback类 # hystrix: # enabled: true ##设置@FeignClient注解的Hystrix功能的全局超时时间 #hystrix: # command: # default: # execution: # isolation: # thread: # timeoutInMilliseconds: 5000
给服务端80/8080 controller/service层需要服务降级的方法添加注解==@HystrixCommand==
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableFeignClients @EnableHystrix public class OrderHystrixMain8080 { public static void main (String[] args) { SpringApplication.run(OrderHystrixMain8080.class,args); } }
1 2 3 4 5 6 7 8 9 10 11 12 @GetMapping("/consumer/payment/hystrix/timeout/{id}") @HystrixCommand(fallbackMethod = "paymentTimeOutFallbackMethod",commandProperties = { //这里设置的时间被Feign/Ribbon的客户端默认最长等待时间覆盖,还需要修改配置文件yaml @HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value="5000") }) public String paymentInfo_TimeOut (@PathVariable("id") Integer id) { String result = paymentHystrixService.paymentInfo_TimeOut(id); return result; } public String paymentTimeOutFallbackMethod (@PathVariable("id") Integer id) { return "我是消费者80,对方支付系统繁忙请10秒钟后再试或者自己运行出错请检查自己,o(╥﹏╥)o" ; }
全局fallback方法配置(减少代码膨胀) ==@DefaultProperties(defaultFallback = “”)==
1:1 每个方法配置一个服务降级方法,技术上可以,实际上傻X
1:N 除了个别重要核心业务有专属,其它普通的可以通过@DefaultProperties(defaultFallback = “”) 统一跳转到统一处理结果页面
在对应业务类上加注解@DefaultProperties(defaultFallback = “”)指定全局通用fallback方法,再在改业务类中写一个全局通用fallback方法,加上了@Hystrix注解但未指明独有fallback方法的方法就调用全局通用fallback方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @RestController @Slf4j @DefaultProperties(defaultFallback = "payment_Global_FallbackMethod") public class OrderHystrixController { @Resource private PaymentHystrixService paymentHystrixService; @GetMapping("/consumer/payment/hystrix/ok/{id}") public String paymentInfo_OK (@PathVariable("id") Integer id) { String result = paymentHystrixService.paymentInfo_OK(id); return result; } @GetMapping("/consumer/payment/hystrix/timeout/{id}") @HystrixCommand public String paymentInfo_TimeOut (@PathVariable("id") Integer id) { String result = paymentHystrixService.paymentInfo_TimeOut(id); return result; } public String paymentTimeOutFallbackMethod (@PathVariable("id") Integer id) { return "我是消费者80,对方支付系统繁忙请10秒钟后再试或者自己运行出错请检查自己,o(╥﹏╥)o" ; } public String payment_Global_FallbackMethod () { return "Global异常处理信息,请稍后再试,/(ㄒoㄒ)/~~" ; } }
Service层使用Feign配置fallback类(实现解耦) ==当服务端down机时,能够返回fallback类的fallback方法==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 server: port: 8080 eureka: client: register-with-eureka: false service-url: defaultZone: http://eureka7001.com:7001/eureka/ feign: client: config: default: connect-timeout: 5000 read-timeout: 5000 hystrix: enabled: true hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 5000
根据cloud-consumer-feign-hystrix-order80已经有的PaymentHystrixService接口, 重新新建一个类(PaymentFallbackService)实现该接口,统一为接口里面的方法进行异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component public class PaymentFallbackService implements PaymentHystrixService { @Override public String paymentInfo_OK (Integer id) { return "-----PaymentFallbackservice fall back-paymentInfo_OK ,┭┮﹏┭┮" ; } @Override public String paymentInfo_TimeOut (Integer id) { return "-----PaymentFallbackservice fall back-paymentInfo_Timeout ,┭┮﹏┭┮" ; } }
1 2 3 4 5 6 7 8 9 10 11 @Component @FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT",fallback = PaymentFallbackService.class) public interface PaymentHystrixService { @GetMapping("/payment/hystrix/ok/{id}") public String paymentInfo_OK (@PathVariable("id") Integer id) ; @GetMapping("/payment/hystrix/timeout/{id}") public String paymentInfo_TimeOut (@PathVariable("id") Integer id) ; }
服务熔断break(熔断是服务端) 熔断机制概述 熔断机制是应对雪崩效应的一种微服务链路保护机制。当扇出链路的某个微服务出错不可用或者响应时间太长时, 会进行服务的降级,进而熔断该节点微服务的调用,快速返回错误的响应信息。
==当检测到该节点微服务调用响应正常后,恢复调用链路。 ==
在Spring Cloud框架里,熔断机制通过Hystrix实现。Hystrix会监控微服务间调用的状况, 当失败的调用到一定阈值,缺省是5秒内20次调用失败,就会启动熔断机制。熔断机制的注解也是==@HystrixCommand。==
当满足一定的阀值的时候(默认10秒内超过20个请求次数)且 当失败率达到一定的时候(默认10秒内超过50%的请求失败)
1:再有请求调用的时候,将不会调用主逻辑,而是直接调用降级fallback 。通过断路器,实现了自动地发现错误并将降级逻辑切换为主逻辑,减少响应延迟的效果。
2:原来的主逻辑要如何恢复呢? 对于这一问题,hystrix也为我们实现了自动恢复功能 。 当断路器打开,对主逻辑进行熔断之后,hystrix会启动一个休眠时间窗,在这个时间窗内,降级逻辑是临时的成为主逻辑, 当休眠时间窗到期,断路器将进入半开状态,释放一次请求到原来的主逻辑上,如果此次请求正常返回,那么断路器将继续闭合, 主逻辑恢复,如果这次请求依然有问题,断路器继续进入打开状态,休眠时间窗重新计时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override @HystrixCommand(fallbackMethod = "paymentCircuitBreaker_fallback",commandProperties = { @HystrixProperty(name = "circuitBreaker.enabled",value = "true"),//是否开启断路器 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "10"),//请求次数 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds",value = "10000"),//时间窗口期 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "60"),//失败率达到多少后跳闸 }) public String paymentCircuitBreaker (@PathVariable("id") Integer id) { if (id < 0 ) { throw new RuntimeException ("******id 不能负数" ); } String serialNumber = IdUtil.simpleUUID(); return Thread.currentThread().getName()+"\t" +"调用成功,流水号: " + serialNumber; } public String paymentCircuitBreaker_fallback (@PathVariable("id") Integer id) { return "id 不能负数,请稍后再试,/(ㄒoㄒ)/~~ id: " +id; }
1 2 3 4 5 6 7 @GetMapping("/payment/circuit/{id}") public String paymentCircuitBreaker (@PathVariable("id") Integer id) { String result = paymentService.paymentCircuitBreaker(id); log.info("****result: " +result); return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 @HystrixCommand(fallbackMethod = "str_fallbackMethod", groupKey = "strGroupCommand", commandKey = "strCommand", threadPoolKey = "strThreadPool", commandProperties = { // 设置隔离策略,THREAD 表示线程池 SEMAPHORE:信号池隔离 @HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"), // 当隔离策略选择信号池隔离的时候,用来设置信号池的大小(最大并发数) @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10"), // 配置命令执行的超时时间 @HystrixProperty(name = "execution.isolation.thread.timeoutinMilliseconds", value = "10"), // 是否启用超时时间 @HystrixProperty(name = "execution.timeout.enabled", value = "true"), // 执行超时的时候是否中断 @HystrixProperty(name = "execution.isolation.thread.interruptOnTimeout", value = "true"), // 执行被取消的时候是否中断 @HystrixProperty(name = "execution.isolation.thread.interruptOnCancel", value = "true"), // 允许回调方法执行的最大并发数 @HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "10"), // 服务降级是否启用,是否执行回调函数 @HystrixProperty(name = "fallback.enabled", value = "true"), // 是否启用断路器 @HystrixProperty(name = "circuitBreaker.enabled", value = "true"), // 该属性用来设置在滚动时间窗中,断路器熔断的最小请求数。例如,默认该值为 20 的时候, // 如果滚动时间窗(默认10秒)内仅收到了19个请求, 即使这19个请求都失败了,断路器也不会打开。 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "20"), // 该属性用来设置在滚动时间窗中,表示在滚动时间窗中,在请求数量超过 // circuitBreaker.requestVolumeThreshold 的情况下,如果错误请求数的百分比超过50, // 就把断路器设置为 "打开" 状态,否则就设置为 "关闭" 状态。 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"), // 该属性用来设置当断路器打开之后的休眠时间窗。 休眠时间窗结束之后, // 会将断路器置为 "半开" 状态,尝试熔断的请求命令,如果依然失败就将断路器继续设置为 "打开" 状态, // 如果成功就设置为 "关闭" 状态。 @HystrixProperty(name = "circuitBreaker.sleepWindowinMilliseconds", value = "5000"), // 断路器强制打开 @HystrixProperty(name = "circuitBreaker.forceOpen", value = "false"), // 断路器强制关闭 @HystrixProperty(name = "circuitBreaker.forceClosed", value = "false"), // 滚动时间窗设置,该时间用于断路器判断健康度时需要收集信息的持续时间 @HystrixProperty(name = "metrics.rollingStats.timeinMilliseconds", value = "10000"), // 该属性用来设置滚动时间窗统计指标信息时划分"桶"的数量,断路器在收集指标信息的时候会根据 // 设置的时间窗长度拆分成多个 "桶" 来累计各度量值,每个"桶"记录了一段时间内的采集指标。 // 比如 10 秒内拆分成 10 个"桶"收集这样,所以 timeinMilliseconds 必须能被 numBuckets 整除。否则会抛异常 @HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "10"), // 该属性用来设置对命令执行的延迟是否使用百分位数来跟踪和计算。如果设置为 false, 那么所有的概要统计都将返回 -1。 @HystrixProperty(name = "metrics.rollingPercentile.enabled", value = "false"), // 该属性用来设置百分位统计的滚动窗口的持续时间,单位为毫秒。 @HystrixProperty(name = "metrics.rollingPercentile.timeInMilliseconds", value = "60000"), // 该属性用来设置百分位统计滚动窗口中使用 “ 桶 ”的数量。 @HystrixProperty(name = "metrics.rollingPercentile.numBuckets", value = "60000"), // 该属性用来设置在执行过程中每个 “桶” 中保留的最大执行次数。如果在滚动时间窗内发生超过该设定值的执行次数, // 就从最初的位置开始重写。例如,将该值设置为100, 滚动窗口为10秒,若在10秒内一个 “桶 ”中发生了500次执行, // 那么该 “桶” 中只保留 最后的100次执行的统计。另外,增加该值的大小将会增加内存量的消耗,并增加排序百分位数所需的计算时间。 @HystrixProperty(name = "metrics.rollingPercentile.bucketSize", value = "100"), // 该属性用来设置采集影响断路器状态的健康快照(请求的成功、 错误百分比)的间隔等待时间。 @HystrixProperty(name = "metrics.healthSnapshot.intervalinMilliseconds", value = "500"), // 是否开启请求缓存 @HystrixProperty(name = "requestCache.enabled", value = "true"), // HystrixCommand的执行和事件是否打印日志到 HystrixRequestLog 中 @HystrixProperty(name = "requestLog.enabled", value = "true"), }, threadPoolProperties = { // 该参数用来设置执行命令线程池的核心线程数,该值也就是命令执行的最大并发量 @HystrixProperty(name = "coreSize", value = "10"), // 该参数用来设置线程池的最大队列大小。当设置为 -1 时,线程池将使用 SynchronousQueue 实现的队列, // 否则将使用 LinkedBlockingQueue 实现的队列。 @HystrixProperty(name = "maxQueueSize", value = "-1"), // 该参数用来为队列设置拒绝阈值。 通过该参数, 即使队列没有达到最大值也能拒绝请求。 // 该参数主要是对 LinkedBlockingQueue 队列的补充,因为 LinkedBlockingQueue // 队列不能动态修改它的对象大小,而通过该属性就可以调整拒绝请求的队列大小了。 @HystrixProperty(name = "queueSizeRejectionThreshold", value = "5"), } ) public String strConsumer () { return "hello 2020" ; } public String str_fallbackMethod () { return "*****fall back str_fallbackMethod" ; }
Hystrix仪表盘HystrixDashboard9001 新建module : cloud-consumer-hystrix-dashboard9001
改pom: (actuator指标监控是各种监控的必备依赖)
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-hystrix-dashboard</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency >
1 2 3 4 5 6 server: port: 9001 hystrix: dashboard: proxy-stream-allow-list: "*"
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableHystrixDashboard public class HystrixDashboardMain9001 { public static void main (String[] args) { SpringApplication.run(HystrixDashboardMain9001.class, args); } }
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Bean public ServletRegistrationBean getServlet () { HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet (); ServletRegistrationBean registrationBean = new ServletRegistrationBean (streamServlet); registrationBean.setLoadOnStartup(1 ); registrationBean.addUrlMappings("/hystrix.stream" ); registrationBean.setName("HystrixMetricsStreamServlet" ); return registrationBean; }
1 2 3 4 5 6 management: endpoints: web: exposure: include: "hystrix.stream"
服务网关 Gateway Gateway是在Spring生态系统之上构建的API网关服务,基于Spring 5,Spring Boot 2和 Project Reactor等技术。 Gateway旨在提供一种简单而有效的方式来对API进行路由,以及提供一些强大的过滤器功能, 例如:熔断、限流、重试等
Spring Cloud Gateway的目标提供统一的路由方式且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控/指标,和限流。
SpringCloud Gateway 使用的Webflux中的reactor-netty响应式编程组件,底层使用了Netty通讯框架。
能干嘛? 反向代理、鉴权、流量控制、熔断、日志监控。。。。。。
Spring Cloud Gateway 具有如下特性:
基于Spring Framework 5, Project Reactor 和 Spring Boot 2.0 进行构建; 动态路由:能够匹配任何请求属性; 可以对路由指定 Predicate(断言)和 Filter(过滤器); 集成Hystrix的断路器功能; 集成 Spring Cloud 服务发现功能; 易于编写的 Predicate(断言)和 Filter(过滤器); 请求限流功能; 支持路径重写。
路由和断言配置(yaml): 1、新建module cloud-gateway-gateway9527:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 server: port: 9527 spring: application: name: cloud-gateway eureka: instance: hostname: cloud-gateway-service client: service-url: register-with-eureka: true fetch-registry: true defaultZone: http://eureka7001.com:7001/eureka
1 2 3 4 5 6 7 @SpringBootApplication @EnableEurekaClient public class GateWayMain9527 { public static void main (String[] args) { SpringApplication.run(GateWayMain9527.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 server: port: 9527 spring: application: name: cloud-gateway cloud: gateway: routes: - id: payment_routh uri: http://localhost:8001 predicates: - Path=/payment/get/** - id: payment_routh2 uri: http://localhost:8001 predicates: - Path=/payment/lb/** eureka: instance: hostname: cloud-gateway-service client: service-url: register-with-eureka: true fetch-registry: true defaultZone: http://eureka7001.com:7001/eureka
路由和断言配置(Bean注入): 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Configuration public class GateWayConfig { @Bean public RouteLocator customRouteLocator (RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); routes.route("path_route_atguigu" , r -> r.path("/guonei" ).uri("http://news.baidu.com/guonei" )).build(); return routes.build(); } @Bean public RouteLocator customRouteLocator2 (RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); routes.route("path_route_atguigu2" , r -> r.path("/guoji" ).uri("http://news.baidu.com/guoji" )).build(); return routes.build(); } }
通过微服务名实现动态路由 在上列基础上,修改yaml文件,实现动态路由和负载均衡:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 server: port: 9527 spring: application: name: cloud-gateway cloud: gateway: discovery: locator: enabled: true routes: - id: payment_routh uri: lb://cloud-payment-service predicates: - Path=/payment/get/** - id: payment_routh2 uri: lb://cloud-payment-service predicates: - Path=/payment/lb/** eureka: instance: hostname: cloud-gateway-service client: service-url: register-with-eureka: true fetch-registry: true defaultZone: http://eureka7001.com:7001/eureka
断言的具体使用: (一共九种)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 server: port: 9527 spring: application: name: cloud-gateway cloud: gateway: discovery: locator: enabled: true routes: - id: payment_routh uri: lb://cloud-payment-service predicates: - Path=/payment/get/** - id: payment_routh2 uri: lb://cloud-payment-service predicates: - Path=/payment/lb/** - After=2023-05-14T14:42:57.342+08:00[Asia/Shanghai] eureka: instance: hostname: cloud-gateway-service client: service-url: register-with-eureka: true fetch-registry: true defaultZone: http://eureka7001.com:7001/eureka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Component @Slf4j public class MyLogGateWayFilter implements GlobalFilter , Ordered { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { log.info("*************come in MyLogGateWayFilter: " + new Date ()); String uname = exchange.getRequest().getQueryParams().getFirst("uname" ); if (uname == null ) { log.info("*******用户为null,非法用户,┭┮﹏┭┮" ); exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } @Override public int getOrder () { return 0 ; } }
服务配置 Config
config服务端配置与测试 准备工作:
2、由上一步获得刚新建的git地址 git@github.com :spongehah/springcloud-config.git
3、本地硬盘目录上新建git仓库并clone git clone git@github.com :spongehah/springcloud-config.git(不行使用http)
新建Module模块cloud-config-center-3344 它即为Cloud的配置中心模块cloudConfig Center
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-config-server</artifactId > <exclusions > <exclusion > <groupId > com.jcraft</groupId > <artifactId > jsch</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > com.github.mwiede</groupId > <artifactId > jsch</artifactId > <version > 0.2.0</version > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 @Configuration public class JschConfig { static { JSch.setConfig("signature.rsa" , "com.jcraft.jsch.jce.SignatureRSA" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 server: port: 3344 spring: application: name: cloud-config-center cloud: config: server: git: uri: git@github.com:spongehah/springcloud-config.git search-paths: - springcloud-config label: main eureka: client: service-url: defaultZone: http://localhost:7001/eureka
1 2 3 4 5 6 7 @SpringBootApplication @EnableConfigServer public class ConfigCenterMain3344 { public static void main (String[] args) { SpringApplication.run(ConfigCenterMain3344.class, args); } }
windows下修改hosts文件,增加映射: config-3344.com(可选)
配置读取规则 1、/{label}/{application}-{profile}.yml eg:http://config-3344.com:3344/master/config-dev.yml
2、/{application}-{profile}.yml eg:http://config-3344.com:3344/config-dev.yml
3、/{application}/{profile}[/{label}] eg:http://config-3344.com:3344/config/dev/master
config客户端配置与测试 bootstrap.yml :
applicaiton.yml是用户级的资源配置项 bootstrap.yml是系统级的,优先级更加高
Spring Cloud会创建一个“Bootstrap Context”,作为Spring应用的Application Context
的父上下文 。初始化的时候,Bootstrap Context
负责从外部源 加载配置属性并解析配置。这两个上下文共享一个从外部获取的Environment
属性有高优先级,默认情况下,它们不会被本地配置覆盖。 Bootstrap context
和Application Context
文件,保证Bootstrap Context
和Application Context
要将Client模块下的application.yml文件改为bootstrap.yml,这是很关键的, 因为bootstrap.yml是比application.yml先加载的。bootstrap.yml优先级高于application.yml
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-config</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 server: port: 3355 spring: application: name: config-client cloud: config: label: main name: config profile: dev uri: http://localhost:3344 eureka: client: service-url: defaultZone: http://localhost:7001/eureka
1 2 3 4 5 6 7 @SpringBootApplication @EnableEurekaClient public class ConfigClientMain3355 { public static void main (String[] args) { SpringApplication.run(ConfigClientMain3355.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 @RestController public class ConfigClientController { @Value("${config.info}") private String configInfo; @GetMapping("/configInfo") public String getConfigInfo () { return configInfo; } }
config客户端之动态刷新 当在github修改配置文件后,config-server会立即响应,而config-client只能重启或者重新加载才会更新
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency >
1 2 3 4 5 6 management: endpoints: web: exposure: include: "*"
1 2 3 4 5 6 7 8 9 10 11 12 @RestController @RefreshScope public class ConfigClientController { @Value("${config.info}") private String configInfo; @GetMapping("/configInfo") public String getConfigInfo () { return configInfo; } }
curl -X POST “http://localhost:3355/actuator/refresh “
服务总线 什么是总线 在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。
Bus Spring Cloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当作微服务间的通信通道。
基本原理 ConfigClient实例都监听MQ中同一个topic(默认是springCloudBus)。当一个服务刷新数据的时候,它会把这个信息放入到Topic中,这样其它监听同一Topic的服务就能得到通知,然后去更新自身的配置。
Spring Cloud Bus 配合 Spring Cloud Config 使用可以实现配置的动态刷新。
Spring Cloud Bus是用来将分布式系统的节点与轻量级消息系统链接起来的框架, 它整合了Java的事件处理机制和消息中间件的功能。 Spring Clud Bus目前支持RabbitMQ和Kafka
Erl+RabbitMQ环境配置 安装Erlang,下载地址:http://erlang.org/download/otp_win64_21.3.exe
进入RabbitMQ安装目录下的sbin目录,输入以下命令启动管理功能和可视化插件:rabbitmq-plugins enable rabbitmq_management
访问地址查看是否安装成功:http://localhost:15672/ 输入账号密码并登录:guest guest
SpringCloud Bus动态刷新全局广播 设计思想:
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-config</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 server: port: 3366 spring: application: name: config-client cloud: config: label: main name: config profile: dev uri: http://localhost:3344 eureka: client: service-url: defaultZone: http://localhost:7001/eureka management: endpoints: web: exposure: include: "*"
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableEurekaClient public class ConfigClientMain3366 { public static void main (String[] args) { SpringApplication.run(ConfigClientMain3366.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 @RestController @RefreshScope public class ConfigClientController { @Value("${config.info}") private String configInfo; @GetMapping("/configInfo") public String getConfigInfo () { return configInfo; } }
7、给config-server3344 和 config-client3355、3366添加mq-bus依赖:
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-bus-amqp</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 server: port: 3344 spring: application: name: cloud-config-center cloud: config: server: git: uri: git@gitee.com:zhaoyingjiehaha/springcloud-config.git search-paths: - springcloud-config default-label: main label: main rabbitmq: host: localhost port: 5672 username: guest password: guest eureka: client: service-url: defaultZone: http://localhost:7001/eureka management: endpoints: web: exposure: include: 'bus-refresh'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 server: port: 3355 /3366 spring: application: name: config-client cloud: config: label: main name: config profile: dev uri: http://localhost:3344 rabbitmq: host: localhost port: 5672 username: guest password: guest eureka: client: service-url: defaultZone: http://localhost:7001/eureka management: endpoints: web: exposure: include: "*"
10、全局广播:git上修改配置文件后,给config-server发送post请求:curl -X POST “http://localhost:3344/actuator/bus-refresh “
SpringCloud Bus动态刷新定点通知 指定具体某一个实例生效而不是全部
我们这里以刷新运行在3355端口上的config-client为例:curl -X POST “http://localhost:3344/actuator/bus-refresh/config-client:3355 “
服务优化 Stream消息驱动 什么是SpringCloudStream :屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。 通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。 所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。 Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离 。 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
Spring Cloud Stream标准流程套路:
Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
消息驱动之生产者配置 先搭建好kafka或者rabbitMQ的环境
1、新建module cloud-stream-rabbitmq-provider8801
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-stream-rabbit</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: output: destination: studyExchange content-type: application/json binder: defaultRabbit eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: send-8801.com prefer-ip-address: true
1 2 3 4 5 6 7 @SpringBootApplication public class StreamMQMain8801 { public static void main (String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
1 2 3 public interface IMessageProvider { public String send () ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.spongehah.springcloud.service.impl;import com.spongehah.springcloud.service.IMessageProvider;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.integration.support.MessageBuilder;import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;import java.util.UUID;@EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; @Override public String send () { String serial = UUID.randomUUID().toString(); this .output.send(MessageBuilder.withPayload(serial).build()); System.out.println("***serial: " + serial); return serial; } }
1 2 3 4 5 6 7 8 9 10 11 @RestController public class SendMessageController { @Resource private IMessageProvider messageProvider; @GetMapping("/sendMessage") public String sendMessage () { return messageProvider.send(); } }
消息驱动之消费者配置 1、新建module cloud-stream-rabbitmq-consumer8802
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-stream-rabbit</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: input: destination: studyExchange content-type: application/json binder: defaultRabbit eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: receive-8802.com prefer-ip-address: true
1 2 3 4 5 6 7 @SpringBootApplication public class StreamMQMain8802 { public static void main (String[] args) { SpringApplication.run(StreamMQMain8802.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.spongehah.springcloud.listener;import org.springframework.beans.factory.annotation.Value;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;@Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input (Message<String> message) { System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t port: " +serverPort); } }
分组消费与持久化 ==重复消费问题:==
2、http://localhost:8801/sendMessage 目前是8802/8803同时都收到了,存在重复消费问题
==微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。== 不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: input: destination: studyExchange content-type: application/json binder: defaultRabbit group: spongehahA eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: receive-8802.com prefer-ip-address: true
Sleuth分布式请求链路跟踪 在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用来协同产生最后的请求结果,每一个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。
Spring Cloud Sleuth提供了一套完整的服务跟踪的解决方案,在分布式系统中提供追踪解决方案并且兼容支持了zipkin
一条链路通过Trace Id唯一标识,Span标识发起的请求信息,各span通过parent id 关联起来
zipkin环境安装 https://dl.bintray.com/openzipkin/maven/io/zipkin/java/zipkin-server/下载jar包
目录下找到jar包,使用java -jar命令运行jar包
案例 在cloud-provider-payment8001中:
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-zipkin</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring: application: name: cloud-payment-service zipkin: base-url: http://localhost:9411 sleuth: sampler: probability: 1 datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/db_test?useUnicode=true&characterEncoding=utf-8&userSSL-false username: root password: 123456
1 2 3 4 @GetMapping("/payment/zipkin") public String paymentZipkin () { return "hi ,i'am paymentzipkin server fall back,welcome to atguigu,O(∩_∩)O哈哈~" ; }
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-zipkin</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 spring: application: name: cloud-order-service zipkin: base-url: http://localhost:9411 sleuth: sampler: probability: 1
1 2 3 4 5 6 @GetMapping("/consumer/payment/zipkin") public String paymentZipkin () { String result = restTemplate.getForObject("http://localhost:8001" +"/payment/zipkin/" , String.class); return result; }
Nacos 版本参考目录 SpringcloudAlibaba 处的表格对照,这里选用的是2.1.0
Nacos就是注册中心 + 配置中心的组合
Nacos = Eureka+Config +Bus
(较新版本的nacos默认启动方式为集群cluster模式,运行前先修改startup.cmd文件,将mode修改为standalone,或者启动命令后加加上-m standalone)且安装目录不能有中文
命令运行成功后直接访问http://localhost:8848/nacos 默认账号密码都是nacos
Nacos作为注册中心(AP) 网站服务端 1、新建module: cloudalibaba-provider-payment9001
1 2 3 4 5 6 7 8 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-alibaba-dependencies</artifactId > <version > ${spring-cloud-alibaba.version}</version > <type > pom</type > <scope > import</scope > </dependency >
1 2 3 4 5 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 server: port: 9001 spring: application: name: nacos-payment-provider cloud: nacos: discovery: server-addr: localhost:8848 management: endpoints: web: exposure: include: '*'
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient public class PaymentMain9001 { public static void main (String[] args) { SpringApplication.run(PaymentMain9001.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 @RestController public class PaymentController { @Value("${server.port}") private String serverPort; @GetMapping(value = "/payment/nacos/{id}") public String getPayment (@PathVariable("id") Integer id) { return "nacos registry, serverPort: " + serverPort+"\t id" +id; } }
6、新建9002构成网站服务端集群,因为配置和9001完全一样,故使用copy configuration复制一个相同的环境使用虚拟端口映射,使用9002号端口进行启动
name:PaymentMain9002 CopyOf 9001
VM options:-DServer.port=9002
网站客户端/消费端ribbon版(使用restRemplate) 1、新建module: cloudalibaba-consumer-nacos-order83
1 2 3 4 5 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 server: port: 83 spring: application: name: nacos-order-consumer cloud: nacos: discovery: server-addr: localhost:8848 service-url: nacos-user-service: http://nacos-payment-provider
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient public class OrderNacosMain83 { public static void main (String[] args) { SpringApplication.run(OrderNacosMain83.class, args); } }
1 2 3 4 5 6 7 8 9 @Configuration public class ApplicationContextConfig { @Bean @LoadBalanced public RestTemplate getRestTemplate () { return new RestTemplate (); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @RestController public class OrderNacosController { @Resource private RestTemplate restTemplate; @Value("${service-url.nacos-user-service}") private String serverURL; @GetMapping("/consumer/payment/nacos/{id}") public String paymentInfo (@PathVariable("id") Long id) { return restTemplate.getForObject(serverURL+"/payment/nacos/" +id,String.class); } }
网站客户端/消费端Feign版 客户端/消费端是调用服务的那一端,不是提供服务的那一端
1、新建module: cloudalibaba-consumer-nacos-feign-order83
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 server: port: 83 spring: application: name: nacos-order-consumer cloud: nacos: discovery: server-addr: localhost:8848
1 2 3 4 5 6 7 8 9 @SpringBootApplication @EnableFeignClients @EnableDiscoveryClient public class OrderNacosFeignMain83 { public static void main (String[] args) { SpringApplication.run(OrderNacosFeignMain83.class, args); } }
1 2 3 4 5 6 7 @Component @FeignClient(name = "nacos-payment-provider") public interface PaymentFeignService { @GetMapping(value = "/payment/nacos/{id}") public String getPayment (@PathVariable("id") Integer id) ; }
1 2 3 4 5 6 7 8 9 10 11 12 @RestController public class OrderNacosFeignController { @Resource private PaymentFeignService paymentFeignService; @GetMapping("/consumer/payment/nacos/{id}") public String paymentInfo (@PathVariable("id") Integer id) { return paymentFeignService.getPayment(id); } }
Nacos与其他服务注册中心的对比 C是所有节点在同一时间看到的数据是一致的;而A的定义是所有的请求都会收到响应。
何时选择使用何种模式? 一般来说, 如果不需要存储服务级别的信息且服务实例是通过nacos-client注册,并能够保持心跳上报,那么就可以选择AP模式。当前主流的服务如 Spring cloud 和 Dubbo 服务,都适用于AP模式,AP模式为了服务的可能性而减弱了一致性,因此AP模式下只支持注册临时实例。
如果需要在服务级别编辑或者存储配置信息,那么 CP 是必须,K8S服务和DNS服务则适用于CP模式。 CP模式下则支持注册持久化实例,此时则是以 Raft 协议为集群运行模式,该模式下注册实例之前必须先注册服务,如果服务不存在,则会返回错误。
Nacos 支持AP和CP模式的切换
curl -X PUT ‘$NACOS_SERVER:8848/nacos/v1/ns/operator/switches?entry=serverMode&value=CP’
Nacos作为配置中心 基础配置 1、新建module: cloudalibaba-config-nacos-client3377
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency >
Nacos同springcloud-config一样,在项目初始化时,要保证先从配置中心进行配置拉取, 拉取配置之后,才能保证项目的正常启动。
这里要写bootstrap.yaml + application.yaml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 server: port: 3377 spring: application: name: nacos-config-client cloud: nacos: discovery: server-addr: localhost:8848 config: server-addr: localhost:8848 file-extension: yaml
1 2 3 spring: profiles: active: dev
1 2 3 4 5 6 7 @SpringBootApplication @EnableDiscoveryClient public class NacosConfigClientMain3377 { public static void main (String[] args) { SpringApplication.run(NacosConfigClientMain3377.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @RestController @RefreshScope public class ConfigClientController { @Value("${config.info}") private String configInfo; @GetMapping("/config/info") public String getConfigInfo () { return configInfo; } }
配置读取规则 新增、修改配置:
公式: ${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
==自带动态刷新:==( 已经加上了@RefreshScope)
三种方案加载配置 Namespace+Group+Data ID三者关系?为什么这么设计?
默认情况: Namespace=public,Group=DEFAULT_GROUP, 默认Cluster是DEFAULT
Nacos默认的命名空间是public,Namespace主要用来实现隔离。 比方说我们现在有三个环境:开发、测试、生产环境,我们就可以创建三个Namespace,不同的Namespace之间是隔离的。
Service就是微服务;一个Service可以包含多个Cluster(集群),Nacos默认Cluster是DEFAULT,Cluster是对指定微服务的一个虚拟划分。 比方说为了容灾,将Service微服务分别部署在了杭州机房和广州机房, 这时就可以给杭州机房的Service微服务起一个集群名称(HZ), 给广州机房的Service微服务起一个集群名称(GZ),还可以尽量让同一个机房的微服务互相调用,以提升性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 server: port: 3377 spring: application: name: nacos-config-client cloud: nacos: discovery: server-addr: localhost:8848 config: server-addr: localhost:8848 file-extension: yaml group: DEV_GROUP
1 2 3 4 spring: profiles: active: dev
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 server: port: 3377 spring: application: name: nacos-config-client cloud: nacos: discovery: server-addr: localhost:8848 config: server-addr: localhost:8848 file-extension: yaml group: DEV_GROUP namespace: deve9a0eac7-5df9-4a4b-beae-5ac71df2e76e
1 2 3 4 spring: profiles: active: dev
Nacos集群和持久化配置 Windows版持久化(设置为mysql数据库) Nacos默认自带的是嵌入式数据库derby,nacos集群中一台nacos一个数据库,按照上述,我们需要mysql数据库,统一nacos集群之间的数据来源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 CREATE TABLE `config_info` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT COMMENT 'id' , `data_id` varchar (255 ) NOT NULL COMMENT 'data_id' , `group_id` varchar (255 ) DEFAULT NULL , `content` longtext NOT NULL COMMENT 'content' , `md5` varchar (32 ) DEFAULT NULL COMMENT 'md5' , `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间' , `src_user` text COMMENT 'source user' , `src_ip` varchar (50 ) DEFAULT NULL COMMENT 'source ip' , `app_name` varchar (128 ) DEFAULT NULL , `tenant_id` varchar (128 ) DEFAULT '' COMMENT '租户字段' , `c_desc` varchar (256 ) DEFAULT NULL , `c_use` varchar (64 ) DEFAULT NULL , `effect` varchar (64 ) DEFAULT NULL , `type` varchar (64 ) DEFAULT NULL , `c_schema` text, `encrypted_data_key` text NOT NULL COMMENT '秘钥' , PRIMARY KEY (`id`), UNIQUE KEY `uk_configinfo_datagrouptenant` (`data_id`,`group_id`,`tenant_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= 'config_info' ; CREATE TABLE `config_info_aggr` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT COMMENT 'id' , `data_id` varchar (255 ) NOT NULL COMMENT 'data_id' , `group_id` varchar (255 ) NOT NULL COMMENT 'group_id' , `datum_id` varchar (255 ) NOT NULL COMMENT 'datum_id' , `content` longtext NOT NULL COMMENT '内容' , `gmt_modified` datetime NOT NULL COMMENT '修改时间' , `app_name` varchar (128 ) DEFAULT NULL , `tenant_id` varchar (128 ) DEFAULT '' COMMENT '租户字段' , PRIMARY KEY (`id`), UNIQUE KEY `uk_configinfoaggr_datagrouptenantdatum` (`data_id`,`group_id`,`tenant_id`,`datum_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= '增加租户字段' ; CREATE TABLE `config_info_beta` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT COMMENT 'id' , `data_id` varchar (255 ) NOT NULL COMMENT 'data_id' , `group_id` varchar (128 ) NOT NULL COMMENT 'group_id' , `app_name` varchar (128 ) DEFAULT NULL COMMENT 'app_name' , `content` longtext NOT NULL COMMENT 'content' , `beta_ips` varchar (1024 ) DEFAULT NULL COMMENT 'betaIps' , `md5` varchar (32 ) DEFAULT NULL COMMENT 'md5' , `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间' , `src_user` text COMMENT 'source user' , `src_ip` varchar (50 ) DEFAULT NULL COMMENT 'source ip' , `tenant_id` varchar (128 ) DEFAULT '' COMMENT '租户字段' , `encrypted_data_key` text NOT NULL COMMENT '秘钥' , PRIMARY KEY (`id`), UNIQUE KEY `uk_configinfobeta_datagrouptenant` (`data_id`,`group_id`,`tenant_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= 'config_info_beta' ; CREATE TABLE `config_info_tag` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT COMMENT 'id' , `data_id` varchar (255 ) NOT NULL COMMENT 'data_id' , `group_id` varchar (128 ) NOT NULL COMMENT 'group_id' , `tenant_id` varchar (128 ) DEFAULT '' COMMENT 'tenant_id' , `tag_id` varchar (128 ) NOT NULL COMMENT 'tag_id' , `app_name` varchar (128 ) DEFAULT NULL COMMENT 'app_name' , `content` longtext NOT NULL COMMENT 'content' , `md5` varchar (32 ) DEFAULT NULL COMMENT 'md5' , `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间' , `src_user` text COMMENT 'source user' , `src_ip` varchar (50 ) DEFAULT NULL COMMENT 'source ip' , PRIMARY KEY (`id`), UNIQUE KEY `uk_configinfotag_datagrouptenanttag` (`data_id`,`group_id`,`tenant_id`,`tag_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= 'config_info_tag' ; CREATE TABLE `config_tags_relation` ( `id` bigint (20 ) NOT NULL COMMENT 'id' , `tag_name` varchar (128 ) NOT NULL COMMENT 'tag_name' , `tag_type` varchar (64 ) DEFAULT NULL COMMENT 'tag_type' , `data_id` varchar (255 ) NOT NULL COMMENT 'data_id' , `group_id` varchar (128 ) NOT NULL COMMENT 'group_id' , `tenant_id` varchar (128 ) DEFAULT '' COMMENT 'tenant_id' , `nid` bigint (20 ) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`nid`), UNIQUE KEY `uk_configtagrelation_configidtag` (`id`,`tag_name`,`tag_type`), KEY `idx_tenant_id` (`tenant_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= 'config_tag_relation' ; CREATE TABLE `group_capacity` ( `id` bigint (20 ) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID' , `group_id` varchar (128 ) NOT NULL DEFAULT '' COMMENT 'Group ID,空字符表示整个集群' , `quota` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值' , `usage` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '使用量' , `max_size` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值' , `max_aggr_count` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数,,0表示使用默认值' , `max_aggr_size` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值' , `max_history_count` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量' , `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间' , PRIMARY KEY (`id`), UNIQUE KEY `uk_group_id` (`group_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= '集群、各Group容量信息表' ; CREATE TABLE `his_config_info` ( `id` bigint (64 ) unsigned NOT NULL , `nid` bigint (20 ) unsigned NOT NULL AUTO_INCREMENT, `data_id` varchar (255 ) NOT NULL , `group_id` varchar (128 ) NOT NULL , `app_name` varchar (128 ) DEFAULT NULL COMMENT 'app_name' , `content` longtext NOT NULL , `md5` varchar (32 ) DEFAULT NULL , `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP , `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP , `src_user` text, `src_ip` varchar (50 ) DEFAULT NULL , `op_type` char (10 ) DEFAULT NULL , `tenant_id` varchar (128 ) DEFAULT '' COMMENT '租户字段' , `encrypted_data_key` text NOT NULL COMMENT '秘钥' , PRIMARY KEY (`nid`), KEY `idx_gmt_create` (`gmt_create`), KEY `idx_gmt_modified` (`gmt_modified`), KEY `idx_did` (`data_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= '多租户改造' ; CREATE TABLE `tenant_capacity` ( `id` bigint (20 ) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID' , `tenant_id` varchar (128 ) NOT NULL DEFAULT '' COMMENT 'Tenant ID' , `quota` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值' , `usage` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '使用量' , `max_size` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值' , `max_aggr_count` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数' , `max_aggr_size` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值' , `max_history_count` int (10 ) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量' , `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间' , PRIMARY KEY (`id`), UNIQUE KEY `uk_tenant_id` (`tenant_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= '租户容量信息表' ; CREATE TABLE `tenant_info` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT COMMENT 'id' , `kp` varchar (128 ) NOT NULL COMMENT 'kp' , `tenant_id` varchar (128 ) default '' COMMENT 'tenant_id' , `tenant_name` varchar (128 ) default '' COMMENT 'tenant_name' , `tenant_desc` varchar (256 ) DEFAULT NULL COMMENT 'tenant_desc' , `create_source` varchar (32 ) DEFAULT NULL COMMENT 'create_source' , `gmt_create` bigint (20 ) NOT NULL COMMENT '创建时间' , `gmt_modified` bigint (20 ) NOT NULL COMMENT '修改时间' , PRIMARY KEY (`id`), UNIQUE KEY `uk_tenant_info_kptenantid` (`kp`,`tenant_id`), KEY `idx_tenant_id` (`tenant_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8 COLLATE = utf8_bin COMMENT= 'tenant_info' ; CREATE TABLE `users` ( `username` varchar (50 ) NOT NULL PRIMARY KEY, `password` varchar (500 ) NOT NULL , `enabled` boolean NOT NULL ); CREATE TABLE `roles` ( `username` varchar (50 ) NOT NULL , `role` varchar (50 ) NOT NULL , UNIQUE INDEX `idx_user_role` (`username` ASC , `role` ASC ) USING BTREE ); CREATE TABLE `permissions` ( `role` varchar (50 ) NOT NULL , `resource` varchar (255 ) NOT NULL , `action` varchar (8 ) NOT NULL , UNIQUE INDEX `uk_role_permission` (`role`,`resource`,`action`) USING BTREE ); INSERT INTO users (username, password, enabled) VALUES ('nacos' , '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu' , TRUE );INSERT INTO roles (username, role) VALUES ('nacos' , 'ROLE_ADMIN' );
1 2 3 4 5 6 spring.datasource.platform =mysql db.num =1 db.url.0 =jdbc:mysql:// db.user =root #自己的mysql用户名 db.password =123456 #自己的mysql密码
Nacos2.x集群配置步骤(重点) 因选用的是nacos2.x版本,采用网上很多教学踩了很多坑,用了两天才把坑圆过来呜呜呜 (主要是nginx/VIP转发的配置)
==linux安装Nacos:== 下载nacos.tar.gz包到/opt/下
使用tar -zxvf nacos.tar.gz命令解压
将解压出来的nacos包复制到/mynacos/nacos cp -r nacos /mynacos/nacos
cd /mynacos/bin
==nacos2.x集群配置:==(三台才能构成集群) 1、修改nacos持久化:
1 vim /mynacos/nacos/conf/application.properties
1 2 3 4 5 6 spring.datasource.platform =mysql db.num =1 db.url.0 =jdbc:mysql:// db.user.0 =root #自己的mysql用户名 db.password.0 =123456 #自己的mysql密码
2、创建nacos_config数据库,并执行nacos提供的nacos-mysql.sql脚本 ,脚本文件放在 conf/nacos-mysql.sql:
3、修改 nacos 的集群配置:
进入到 conf 目录中,将集群文件拷贝一份以便进行后续的操作:
1 cp cluster.conf.example cluster.conf
在一台机器上部署nacos集群时,==不能将运行端口设置为连续端口==,比如8841、8842、8843,否则会发生端口冲突。 运行在8841端口nacos服务实际上会占用8841、8941、8942三个端口;
修改 ip 配置(如果是不同机器的同端口)
4、 修改配置文件中的 nacos 启动端口:
1 vim conf/application.properties
5、进入bin目录,修改运行内存,默认的 nacos 启动配置的消耗资源很大,如果服务器支持则可以不进行修改,服务器资源不支持可能会出现启动之后 nacos 集群循环重启
6、复制出两份 nacos 文件夹,制作集群:
1 2 cp -r /mynacos/nacos /mynacos/nacos-2 cp -r /mynacos/nacos /mynacos/nacos-3
7、启动 | 停止 nacos 集群 分别进入/nacos/bin、/nacos-2/bin、/nacos-3/bin目录下,执行命令./startup.sh启动nacos服务。
至此,基于 tar 包的 nacos 2.x 集群部署就结束了
==nginx环境配置== 从官网下载tar.gz包后,放到/opt目录下
使用tar -zxvf xxx.tar.gz解压
cd nginx-1.21.6
1 2 # 使用改命令配置nginx,若出现依赖项缺少,复制出错内容上网搜索对应依赖项安装后继续执行该语句 ./configure --prefix=/usr/local/nginx --with-http_ssl_module --with-stream
执行成功后:执行make && make install安装,安装的默认路径是 /usr/local/nginx
1 2 3 cd /usr/local/nginx/conf cp nginx.conf nginx.conf.bk #备份配置文件 vim nginx.conf
找到对应模块,修改内容如下: #代表修改过的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 http { ...其他... upstream cluster{ server; server; server; } server { listen 1111; server_name; location / { proxy_pass http://cluster; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } ...其他... } stream{ upstream nacos-grpc { server; server; server; } server { listen 2111; proxy_pass nacos-grpc; } upstream nacos-tcp-5445 { server 192.168.111:4334 weight=1; server 192.168.111:5445 weight=1; server 192.168.111:6556 weight=1; } server { listen 2112; proxy_pass nacos-tcp-5445; } }
1 2 3 4 5 6 7 8 cd /usr/local/nginx/sbin ./nginx -c /usr/local/nginx/conf/nginx.conf # nginx其他命令: ./nginx -V #可查看nginx安装的插件 ./nginx -s reload #重载配置文件 ./nginx -s stop #强制中断 ./nginx -s quit #安全退出
1 systemctl stop firewalld
Sentinel 版本参考目录 SpringcloudAlibaba 处的表格对照 这里选用的是1.8.4
直接用java -jar命令运行,默认运行端口为8080,若端口被占用,可使用–server.port=xxx更改启动端口
访问sentinel管理界面:http://localhost:8080 登录账号密码均为sentinel
sentinel新建案例模块 1、新建网站服务端: cloudalibaba-sentinel-service8401
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.csp</groupId > <artifactId > sentinel-datasource-nacos</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 server: port: 8401 spring: application: name: cloudalibaba-sentinel-service cloud: nacos: discovery: server-addr: localhost:8848 sentinel: transport: dashboard: localhost:8080 port: 8719 management: endpoints: web: exposure: include: '*'
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient public class MainApp8401 { public static void main (String[] args) { SpringApplication.run(MainApp8401.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RestController public class FlowLimitController { @GetMapping("/testA") public String testA () { return "------testA" ; } @GetMapping("/testB") public String testB () { return "------testB" ; } }
流量控制 流控模式(流控效果默认快速失败) 直接 该设置表示1秒钟内查询1(阈值)次就是OK,若超过次数1,就直接-快速失败,报默认错误:
结果:Blocked by Sentinel (flow limiting)
关联 当关联的资源达到阈值时,就限流自己
快速点击访问http://localhost:8401/testB过后,在一秒内访问http://localhost:8401/testA将会出现Blocked by Sentinel (flow limiting)
链路 新建接口OrderService:
1 2 3 4 5 6 7 8 9 @Service public class OrderService { @SentinelResource("goods") public String queryGoods () { return "查询商品。。。。" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 @Resource private OrderService orderService;@GetMapping("/query") public String query () { return orderService.queryGoods(); } @GetMapping("/save") public String save () { return orderService.queryGoods(); }
yaml将 web-context-unify 设置为false,关闭context整合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 server: port: 8401 spring: application: name: cloudalibaba-sentinel-service cloud: nacos: discovery: server-addr: localhost:8848 sentinel: transport: dashboard: localhost:8080 port: 8719 web-context-unify: false management: endpoints: web: exposure: include: '*'
链路模式依赖 /a /b这样的路径,而做整合后,原始的/a /b被隐藏掉了,无法判断上下游直接的关系了
该设置表示设置入口资源为 /save 时,对goods进行限流
流控效果 快速失败: 若限流,直接失败,抛出异常,默认为 Blocked by Sentinel (flow limiting)
预热WarmUp 如:秒杀系统在开启的瞬间,会有很多流量上来,很有可能把系统打死,预热方式就是把为了保护系统,可慢慢的把流量放进来,慢慢的把阀值增长到设置的阀值。
默认coldFactor为3,即请求 QPS 从 threshold / 3 开始,经预热时长逐渐升至设定的 QPS 阈值。
案例,阀值为10+预热时长设置5秒。 系统初始化的阀值为10 / 3 约等于3,即阀值刚开始为3;然后过了5秒后阀值才慢慢升高恢复到10
排队等待 匀速排队,阈值必须设置为QPS
熔断降级(无blockHandler) Sentinel 提供以下几种熔断策略:
):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs
)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入==探测恢复状态(HALF-OPEN 状态)==,若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。 异常比例 (ERROR_RATIO
)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入==探测恢复状态(HALF-OPEN 状态)==,若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0]
,代表 0% - 100%。 异常数 (ERROR_COUNT
):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入==探测恢复状态(HALF-OPEN 状态)==,若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。 探测恢复状态(HALF-OPEN 状态):
RT (1.8过后变为慢调用比例)(平均响应时间,秒级) 平均响应时间 超出响应阈值的比例超过比例阈值 且 在时间窗口内通过的请求>=5 ,两个条件同时满足后触发降级 窗口期过后关闭断路器 RT最大4900(更大的需要通过-Dcsp.sentinel.statistic.max.rt=XXXX才能生效)
异常比列(秒级) QPS >= 5 且异常比例(秒级统计)超过阈值时,触发降级;时间窗口结束后,关闭降级
异常数(分钟级) 异常数(秒级统计)超过阈值时,触发降级;时间窗口结束后,关闭降级
Sentinel 熔断降级会在调用链路中某个资源出现不稳定状态时(例如调用超时或异常比例升高),对这个资源的调用进行限制, 让请求快速失败,避免影响到其它的资源而导致级联错误。
当资源被降级后,在接下来的降级时间窗口之内,对该资源的调用都自动熔断(默认行为是抛出 DegradeException)。
慢调用比例 是什么:看上面大标题 熔断降级 统一介绍
1 2 3 4 5 6 7 8 @GetMapping("/testD") public String testD () { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("testD 测试RT" ); return "------testD" ; }
使用工具或者狂点localhost:8401/testD,2000ms内大于5个请求调用testD,我们希望200毫秒处理完本次任务, 如果超过200毫秒还没处理完,在未来2秒钟的时间窗口内,断路器打开(保险丝跳闸)微服务不可用,保险丝跳闸断电了
异常比例 是什么:看上面大标题 熔断降级 统一介绍,类似hystrix的熔断规则
1 2 3 4 5 6 7 @GetMapping("/testC") public String testC () { log.info("testC 测试异常比例" ); int age = 10 /0 ; return "------testD" ; }
按照上述配置,统计时长1000ms内,>=5次请求且错误比例大于0.5就发生熔断降级 testC方法单独访问一次,必然来一次报错一次(int age = 10/0),调一次错一次;
异常数 是什么:看上面大标题 熔断降级 统一介绍
1 2 3 4 5 6 7 @GetMapping("/testE") public String testE () { log.info("testE 测试异常比例" ); int age = 10 /0 ; return "------testE 测试异常比例" ; }
http://localhost:8401/testE,第一次访问绝对报错,因为除数不能为零, 我们看到error窗口,但是达到5次报错后,进入熔断后降级。
热点key 何为热点 热点即经常访问的数据,很多时候我们希望统计或者限制某个热点数据中访问频次最高的TopN数据,并对其访问进行限流或者其它操作
兜底方法 分为系统默认和客户自定义,两种
之前的case,限流出问题后,都是用sentinel系统默认的提示:Blocked by Sentinel (flow limiting)
结论 ==@HystrixCommand 到@SentinelResource==
1 2 3 4 5 6 7 8 9 @GetMapping("/testHotKey") @SentinelResource(value = "testHotKey",blockHandler = "dealHandler_testHotKey") public String testHotKey (@RequestParam(value = "p1",required = false) String p1, @RequestParam(value = "p2",required = false) String p2) { return "------testHotKey" ; } public String dealHandler_testHotKey (String p1, String p2, BlockException exception) { return "-----dealHandler_testHotKey" ; }
@SentinelResource(value = “testHotKey”,blockHandler = “dealHandler_testHotKey”)
若为设置blockHandler ,将会返回异常页面给用户,非常不友好
@SentinelResource 处理的是Sentinel控制台配置的违规情况,有blockHandler方法配置的兜底处理;
RuntimeException int age = 10/0,这个是java运行时报出的运行时异常RunTimeException,@SentinelResource不管
总结 ==@SentinelResource主管配置出错,运行出错该走异常走异常==,hystrix的@HystrixCommand都管
系统规则 系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS 和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效 。入口流量指的是进入应用的流量(EntryType.IN
),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。
Load 自适应 (仅对 Linux/Unix-like 机器生效):系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps * minRt
估算得出。设定参考值一般是 CPU cores * 2.5
。CPU usage (1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。平均 RT :当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。并发线程数 :当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。入口 QPS :当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。==上面的是对具体某个请求或资源进行流控,这里是对整个微服务进行流控==
@SentinelResource的使用 ==Sentinel默认只标记Controller中的方法为资源,如果要标记其它方法,需要利用@SentinelResource注解==
blockHandler:只负责sentinel控制台配置违规(当未满足配置违规时,若java程序有异常,将不会处理 ,返回error页,当满足配置违规时,即使java程序有异常,也会进行处理,返回配置的blockHandler方法处理的值)且未设置sentinel持久化时,项目重启后,sentinel控制台配置将失效丢失
按资源名称限流+后续处理 给cloudalibaba-sentinel-service的
1 2 3 4 5 <dependency > <groupId > com.spongehah.springcloud</groupId > <artifactId > cloud-api-commons</artifactId > <version > ${project.version}</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @RestController public class RateLimitController { @GetMapping("/byResource") @SentinelResource(value = "byResource",blockHandler = "handleException") public CommonResult byResource () { return new CommonResult (200 ,"按资源名称限流测试OK" ,new Payment (2020L ,"serial001" )); } public CommonResult handleException (BlockException exception) { return new CommonResult (444 ,exception.getClass().getCanonicalName()+"\t 服务不可用" ); } }
按照Url地址限流+后续处理 通过访问的URL来限流,会返回Sentinel自带默认的限流处理信息
1 2 3 4 5 @GetMapping("/rateLimit/byUrl") @SentinelResource(value = "byUrl") public CommonResult byUrl () { return new CommonResult (200 ,"按url限流测试OK" ,new Payment (2020L ,"serial002" )); }
控制台中添加资源名时,使用/rateLimit/byUrl,是带 / 的,而不是@SentinelResource中的value属性
客户自定义通用限流处理逻辑 上面兜底方案面临的问题:
1 系统默认的,没有体现我们自己的业务要求。
2 依照现有条件,我们自定义的处理方法又和业务代码耦合在一块,不直观。
3 每个业务方法都添加一个兜底的,那代码膨胀加剧。
4 全局统一的处理方法没有体现。
1 2 3 4 5 6 7 8 9 10 11 public class CustomerBlockHandler { public static CommonResult handleException (BlockException exception) { return new CommonResult (2020 ,"自定义的限流处理信息......CustomerBlockHandler-------1" ); } public static CommonResult handleException2 (BlockException exception) { return new CommonResult (2020 ,"自定义的限流处理信息......CustomerBlockHandler-------2" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @GetMapping("/rateLimit/customerBlockHandler") @SentinelResource(value = "customerBlockHandler", blockHandlerClass = CustomerBlockHandler.class, blockHandler = "handleException2") public CommonResult customerBlockHandler () { return new CommonResult (200 , "按客户自定义限流处理逻辑" ); }
服务熔断功能(设置blockHandler和fallback) 整合ribbon 1、新建网站服务端提供者
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.spongehah.springcloud</groupId > <artifactId > cloud-api-commons</artifactId > <version > ${project.version}</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 server: port: 9003 spring: application: name: nacos-payment-provider cloud: nacos: discovery: server-addr: localhost:8848 management: endpoints: web: exposure: include: '*'
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient public class PaymentMain9003 { public static void main (String[] args) { SpringApplication.run(PaymentMain9003.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @RestController public class PaymentController { @Value("${server.port}") private String serverPort; public static HashMap<Long, Payment> hashMap = new HashMap <>(); static { hashMap.put(1L ,new Payment (1L ,"28a8c1e3bc2742d8848569891fb42181" )); hashMap.put(2L ,new Payment (2L ,"bba8c1e3bc2742d8848569891ac32182" )); hashMap.put(3L ,new Payment (3L ,"6ua8c1e3bc2742d8848569891xt92183" )); } @GetMapping(value = "/paymentSQL/{id}") public CommonResult<Payment> paymentSQL (@PathVariable("id") Long id) { Payment payment = hashMap.get(id); CommonResult<Payment> result = new CommonResult (200 ,"from mysql,serverPort: " +serverPort,payment); return result; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId > </dependency > <dependency > <groupId > com.spongehah.springcloud</groupId > <artifactId > cloud-api-commons</artifactId > <version > ${project.version}</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 server: port: 84 spring: application: name: nacos-order-consumer cloud: nacos: discovery: server-addr: localhost:8848 sentinel: transport: dashboard: localhost:8080 port: 8719 service-url: nacos-user-service: http://nacos-payment-provider
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient public class OrderNacosMain84 { public static void main(String[] args) { SpringApplication.run(OrderNacosMain84.class, args); } }
1 2 3 4 5 6 7 8 9 10 @Configuration public class ApplicationContextConfig { @Bean @LoadBalanced public RestTemplate getRestTemplate () { return new RestTemplate (); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @RestController @Slf4j public class CircleBreakerController { @Value("${service-url.nacos-user-service}") private String SERVICE_URL; @Resource private RestTemplate restTemplate; @RequestMapping("/consumer/fallback/{id}") @SentinelResource(value = "fallback") public CommonResult<Payment> fallback (@PathVariable Long id) { CommonResult<Payment> result = restTemplate.getForObject(SERVICE_URL + "/paymentSQL/" +id, CommonResult.class,id); if (id == 4 ) { throw new IllegalArgumentException ("IllegalArgumentException,非法参数异常...." ); }else if (result.getData() == null ) { throw new NullPointerException ("NullPointerException,该ID没有对应记录,空指针异常" ); } return result; } }
当@SentinelResource(value = “fallback”)注解不配置任何兜底方法时,给客户error页面,不友好
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @SentinelResource(value = "fallback", fallback = "handlerFallback") public CommonResult<Payment> fallback (@PathVariable Long id) { CommonResult<Payment> result = restTemplate.getForObject(SERVICE_URL + "/paymentSQL/" + id, CommonResult.class, id); if (id == 4 ) { throw new IllegalArgumentException ("IllegalArgumentException,非法参数异常...." ); } else if (result.getData() == null ) { throw new NullPointerException ("NullPointerException,该ID没有对应记录,空指针异常" ); } return result; } public CommonResult handlerFallback (@PathVariable Long id, Throwable e) { Payment payment = new Payment (id, "null" ); return new CommonResult <>(444 , "兜底异常handlerFallback,exception内容 " + e.getMessage(), payment); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @RequestMapping("/consumer/fallback/{id}") @SentinelResource(value = "fallback", blockHandler = "blockHandler") public CommonResult<Payment> fallback (@PathVariable Long id) { CommonResult<Payment> result = restTemplate.getForObject(SERVICE_URL + "/paymentSQL/" + id, CommonResult.class, id); if (id == 4 ) { throw new IllegalArgumentException ("非法参数异常...." ); } else if (result.getData() == null ) { throw new NullPointerException ("NullPointerException,该ID没有对应记录" ); } return result; } public CommonResult blockHandler (@PathVariable Long id, BlockException blockException) { Payment payment = new Payment (id, "null" ); return new CommonResult <>(445 , "blockHandler-sentinel限流,无此流水: blockException " + blockException.getMessage(), payment); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @RequestMapping("/consumer/fallback/{id}") @SentinelResource(value = "fallback", fallback = "handlerFallback", blockHandler = "blockHandler") public CommonResult<Payment> fallback (@PathVariable Long id) { CommonResult<Payment> result = restTemplate.getForObject(SERVICE_URL + "/paymentSQL/" + id, CommonResult.class, id); if (id == 4 ) { throw new IllegalArgumentException ("非法参数异常...." ); } else if (result.getData() == null ) { throw new NullPointerException ("NullPointerException,该ID没有对应记录" ); } return result; } public CommonResult handlerFallback (@PathVariable Long id, Throwable e) { Payment payment = new Payment (id, "null" ); return new CommonResult <>(444 , "fallback,无此流水,exception " + e.getMessage(), payment); } public CommonResult blockHandler (@PathVariable Long id, BlockException blockException) { Payment payment = new Payment (id, "null" ); return new CommonResult <>(445 , "blockHandler-sentinel限流,无此流水: blockException " + blockException.getMessage(), payment); }
1 2 @SentinelResource(value = "fallback", fallback = "handlerFallback", blockHandler = "blockHandler", exceptionsToIgnore = {IllegalArgumentException.class})
整合Feign ==feign的fallback类只对服务端down机时使用==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency >
yaml 激活Sentinel对Feign的支持
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 server: port: 84 spring: application: name: nacos-order-consumer cloud: nacos: discovery: server-addr: localhost:8848 sentinel: transport: dashboard: localhost:8080 port: 8719 service-url: nacos-user-service: http://nacos-payment-provider management: endpoints: web: exposure: include: '*' feign: sentinel: enabled: true
1 2 3 4 5 6 7 8 9 @SpringBootApplication @EnableDiscoveryClient @EnableFeignClients public class OrderNacosMain84 { public static void main (String[] args) { SpringApplication.run(OrderNacosMain84.class, args); } }
1 2 3 4 5 6 7 8 @Component @FeignClient(value = "nacos-payment-provider",fallback = PaymentFallbackService.class) public interface PaymentService { @GetMapping(value = "/paymentSQL/{id}") public CommonResult<Payment> paymentSQL (@PathVariable("id") Long id) ; }
PaymentFallbackService implements PaymentService:
1 2 3 4 5 6 7 8 9 @Component public class PaymentFallbackService implements PaymentService { @Override public CommonResult<Payment> paymentSQL (Long id) { return new CommonResult <>(444 , "服务降级返回,没有该流水信息" , new Payment (id, "errorSerial......" )); } }
1 2 3 4 5 6 7 8 9 10 11 @Resource private PaymentService paymentService;@GetMapping(value = "/consumer/openfeign/{id}") public CommonResult<Payment> paymentSQL (@PathVariable("id") Long id) { if (id == 4 ) { throw new RuntimeException ("没有该id" ); } return paymentService.paymentSQL(id); }
熔断框架比较 sentinel规则持久化 一旦我们重启应用,sentinel规则将消失,生产环境需要将配置规则进行持久化
将限流配置规则持久化进Nacos保存,只要刷新8401某个rest地址,sentinel控制台 的流控规则就能看到,只要Nacos里面的配置不删除,针对8401上sentinel上的流控规则持续有效
pom添加: 前面已经添加
1 2 3 4 5 <dependency > <groupId > com.alibaba.csp</groupId > <artifactId > sentinel-datasource-nacos</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 server: port: 8401 spring: application: name: cloudalibaba-sentinel-service cloud: nacos: discovery: server-addr: localhost:8848 sentinel: transport: dashboard: localhost:8080 port: 8719 datasource: ds1: nacos: server-addr: localhost:8848 dataId: ${spring.application.name} groupId: DEFAULT_GROUP data-type: json rule-type: flow management: endpoints: web: exposure: include: '*'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 [ { "resource" : "/rateLimit/byUrl" , "limitApp" : "default" , "grade" : 1 , "count" : 1 , "strategy" : 0 , "controlBehavior" : 0 , "clusterMode" : false } ]
Seata 从1:1 -> 1:N -> N:N
单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源, 业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性问题没法保证。
1:Transaction ID XID: 全局唯一的事务ID
Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;
Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚
Seata安装配置启动(MySQL8) 版本参考目录 SpringcloudAlibaba 处的表格对照 这里选用searta1.5.1适配cloudalibaba2.2.8RELEASE
https://www.jianshu.com/p/37c3640284cc 主要参考seataServer.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${user.home}/logs/seata extend: logstash-appender: destination: 127.0 .0 .1 :4560 kafka-appender: bootstrap-servers: 127.0 .0 .1 :9092 topic: logback_to_logstash console: user: username: seata password: seata seata: config: type: nacos nacos: server-addr: 127.0 .0 .1 :8848 namespace: group: SEATA_GROUP username: nacos password: nacos data-id: seataServer.properties registry: type: nacos nacos: application: seata-server server-addr: 127.0 .0 .1 :8848 group: SEATA_GROUP namespace: cluster: default username: nacos password: nacos security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.webp,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
启动nacos,在nacos面板中,添加seataServer.properties 和service.vgroupMapping.default_tx_group两个配置
seataServer.properties :
1 2 3 Data ID: seataServer.properties Group: SEATA_GROUP 配置格式: properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 seata.tx-service-group =default_tx_group seata.service.vgroup-mapping.default_tx_group =default store.mode =db store.db.datasource =druid store.db.dbType =mysql store.db.driverClassName =com.mysql.cj.jdbc.Driver store.db.url =jdbc:mysql:// store.db.user =root store.db.password =123456 store.db.minConn =5 store.db.maxConn =30 store.db.globalTable =global_table store.db.branchTable =branch_table store.db.lockTable =lock_table store.db.distributedLockTable =distributed_lock store.db.queryLimit =100 store.db.maxWait =5000 client.undo.dataValidation =true client.undo.logSerialization =jackson client.undo.onlyCareUpdateColumns =true client.undo.logTable =undo_log client.undo.compress.enable =true client.undo.compress.type =zip client.undo.compress.threshold =64k server.undo.logSaveDays =7 server.undo.logDeletePeriod =86400000 server.maxCommitRetryTimeout =-1 server.maxRollbackRetryTimeout =-1 server.recovery.committingRetryPeriod =1000 server.recovery.asynCommittingRetryPeriod =1000 server.recovery.rollbackingRetryPeriod =1000 server.recovery.timeoutRetryPeriod =1000 server.rollbackRetryTimeoutUnlockEnable =false server.distributedLockExpireTime =10000 server.xaerNotaRetryTimeout =60000 server.session.branchAsyncQueueSize =5000 server.session.enableBranchAsyncRemove =false client.rm.asyncCommitBufferLimit =10000 client.rm.lock.retryInterval =10 client.rm.lock.retryTimes =30 client.rm.lock.retryPolicyBranchRollbackOnConflict =true client.rm.reportRetryCount =5 client.rm.tableMetaCheckEnable =true client.rm.tableMetaCheckerInterval =60000 client.rm.sqlParserType =druid client.rm.reportSuccessEnable =false client.rm.sagaBranchRegisterEnable =false client.rm.sagaJsonParser =fastjson client.rm.tccActionInterceptorOrder =-2147482648 client.tm.commitRetryCount =5 client.tm.rollbackRetryCount =5 client.tm.defaultGlobalTransactionTimeout =60000 client.tm.degradeCheck =false client.tm.degradeCheckAllowTimes =10 client.tm.degradeCheckPeriod =2000 client.tm.interceptorOrder =-2147482648 tcc.fence.logTableName =tcc_fence_log tcc.fence.cleanPeriod =1h log.exceptionRate =100 metrics.enabled =false metrics.registryType =compact metrics.exporterList =prometheus metrics.exporterPrometheusPort =9898 transport.type =TCP transport.server =NIO transport.heartbeat =true transport.enableTmClientBatchSendRequest =false transport.enableRmClientBatchSendRequest =true transport.enableTcServerBatchSendResponse =false transport.rpcRmRequestTimeout =30000 transport.rpcTmRequestTimeout =30000 transport.rpcTcRequestTimeout =30000 transport.threadFactory.bossThreadPrefix =NettyBoss transport.threadFactory.workerThreadPrefix =NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix =NettyServerBizHandler transport.threadFactory.shareBossWorker =false transport.threadFactory.clientSelectorThreadPrefix =NettyClientSelector transport.threadFactory.clientSelectorThreadSize =1 transport.threadFactory.clientWorkerThreadPrefix =NettyClientWorkerThread transport.threadFactory.bossThreadSize =1 transport.threadFactory.workerThreadSize =default transport.shutdown.wait =3 transport.serialization =seata transport.compressor =none
service.vgroupMapping.default_tx_group,值为default。如果不新建这个文件,客户端启动,控制台会报错can not get cluster name in registry config ‘service.vgroupMapping.default_tx_group’, please make sure registry config correct
1 2 3 4 Data ID: service.vgroupMapping.default_tx_group Group: SEATA_GROUP 配置格式: text 配置内容:default
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 CREATE TABLE IF NOT EXISTS `global_table`( `xid` VARCHAR (128 ) NOT NULL , `transaction_id` BIGINT , `status` TINYINT NOT NULL , `application_id` VARCHAR (32 ), `transaction_service_group` VARCHAR (32 ), `transaction_name` VARCHAR (128 ), `timeout` INT , `begin_time` BIGINT , `application_data` VARCHAR (2000 ), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_status_gmt_modified` (`status` , `gmt_modified`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `branch_table`( `branch_id` BIGINT NOT NULL , `xid` VARCHAR (128 ) NOT NULL , `transaction_id` BIGINT , `resource_group_id` VARCHAR (32 ), `resource_id` VARCHAR (256 ), `branch_type` VARCHAR (8 ), `status` TINYINT, `client_id` VARCHAR (64 ), `application_data` VARCHAR (2000 ), `gmt_create` DATETIME(6 ), `gmt_modified` DATETIME(6 ), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `lock_table`( `row_key` VARCHAR (128 ) NOT NULL , `xid` VARCHAR (128 ), `transaction_id` BIGINT , `branch_id` BIGINT NOT NULL , `resource_id` VARCHAR (256 ), `table_name` VARCHAR (32 ), `pk` VARCHAR (36 ), `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking' , `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_status` (`status`), KEY `idx_branch_id` (`branch_id`), KEY `idx_xid_and_branch_id` (`xid` , `branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `distributed_lock`( `lock_key` CHAR (20 ) NOT NULL , `lock_value` VARCHAR (20 ) NOT NULL , `expire` BIGINT , primary key (`lock_key`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting' , ' ' , 0 );INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting' , ' ' , 0 );INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking' , ' ' , 0 );INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck' , ' ' , 0 );
若是在windows环境下,将 seata/target/seata-server.jar包和所要更换的mysql-connector-java-8.0.29.jar传送到Linux服务器/虚拟机
1 2 3 4 5 jar -xvf seata-server.jar #先解压jar包 rm -rf BOOT-INF/lib/mysql-connector-java-5.1.35.jar #先移除mysql5jar报 mv mysql-connector-java-8.0.29.jar BOOT-INF/lib/ #将mysql8jar包移动过去 rm -rf seata-server.jar #删除原seata-server.jar jar -cfM0 seata-server.jar BOOT-INF/ META-INF/ org/ #重新将这三个目录打包为seata-server.jar
seata-client微服务新建配置启动 案例演示数据库准备 这里我们会创建三个服务,一个订单服务,一个库存服务,一个账户服务。
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存, 再通过远程调用账户服务来扣减用户账户里面的余额, 最后在订单服务中修改订单状态为已完成。
seata_order:存储订单的数据库; CREATE DATABASE seata_order;
seata_storage:存储库存的数据库; CREATE DATABASE seata_storage;
seata_account:存储账户信息的数据库。 CREATE DATABASE seata_account;
1 2 3 4 5 6 7 8 9 10 CREATE TABLE t_order ( `id` BIGINT (11 ) NOT NULL AUTO_INCREMENT PRIMARY KEY, `user_id` BIGINT (11 ) DEFAULT NULL COMMENT '用户id' , `product_id` BIGINT (11 ) DEFAULT NULL COMMENT '产品id' , `count` INT (11 ) DEFAULT NULL COMMENT '数量' , `money` DECIMAL (11 ,0 ) DEFAULT NULL COMMENT '金额' , `status` INT (1 ) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' ) ENGINE= INNODB AUTO_INCREMENT= 7 DEFAULT CHARSET= utf8; SELECT * FROM t_order;
seata_storage库下建t_storage 表:
1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE t_storage ( `id` BIGINT (11 ) NOT NULL AUTO_INCREMENT PRIMARY KEY, `product_id` BIGINT (11 ) DEFAULT NULL COMMENT '产品id' , `total` INT (11 ) DEFAULT NULL COMMENT '总库存' , `used` INT (11 ) DEFAULT NULL COMMENT '已用库存' , `residue` INT (11 ) DEFAULT NULL COMMENT '剩余库存' ) ENGINE= INNODB AUTO_INCREMENT= 2 DEFAULT CHARSET= utf8; INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)VALUES ('1' , '1' , '100' , '0' , '100' );SELECT * FROM t_storage;
seata_account库下建t_account 表:
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE t_account ( `id` BIGINT (11 ) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id' , `user_id` BIGINT (11 ) DEFAULT NULL COMMENT '用户id' , `total` DECIMAL (10 ,0 ) DEFAULT NULL COMMENT '总额度' , `used` DECIMAL (10 ,0 ) DEFAULT NULL COMMENT '已用余额' , `residue` DECIMAL (10 ,0 ) DEFAULT '0' COMMENT '剩余可用额度' ) ENGINE= INNODB AUTO_INCREMENT= 2 DEFAULT CHARSET= utf8; INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1' , '1' , '1000' , '0' , '1000' ); SELECT * FROM t_account;
1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
新建订单Order-Module 参考文章:目录Seata处三篇加下面一篇
https://blog.csdn.net/Jason_We/article/details/113538673 mysql-druid启动报错参考
1、新建module seata-order-service2001
2、pom: 父工程下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 <dependencies > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > druid-spring-boot-starter</artifactId > <version > 1.1.17</version > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > </dependencies >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 server: port: 2001 spring: application: name: seata-order-service cloud: nacos: server-addr: localhost:8848 discovery: group: SEATA_GROUP register-enabled: true datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_order username: root password: 123456 seata: application-id: ${spring.application.name} tx-service-group: default_tx_group service: vgroup-mapping: default_tx_group: default config: type: nacos nacos: server-addr: 127.0 .0 .1 :8848 group: SEATA_GROUP username: nacos password: nacos data-id: "seataServer.properties" registry: type: nacos nacos: application: seata-server server-addr: 127.0 .0 .1 :8848 group: SEATA_GROUP cluster: default username: nacos password: nacos feign: hystrix: enabled: false logging: level: io: seata: info mybatis: mapperLocations: classpath:mapper/*.xml
1 2 3 4 5 6 7 8 9 @SpringBootApplication @EnableDiscoveryClient @EnableFeignClients public class SeataOrderMainApp2001 { public static void main (String[] args) { SpringApplication.run(SeataOrderMainApp2001.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 @Data @AllArgsConstructor @NoArgsConstructor public class CommonResult <T> { private Integer code; private String message; private T data; public CommonResult (Integer code, String message) { this (code, message, null ); } }
Order 订单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long id; private Long userId; private Long productId; private Integer count; private BigDecimal money; private Integer status; }
1 2 3 4 5 6 7 8 9 10 11 12 @Mapper public interface OrderDao { void create (Order order) ; void update (@Param("userId") Long userId, @Param("status") Integer status) ; }
7、resources文件夹下新建mapper文件夹后添加 OrderMapper.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace ="com.spongehah.seata.dao.OrderDao" > <resultMap id ="BaseResultMap" type ="com.spongehah.seata.domain.Order" > <id column ="id" property ="id" jdbcType ="BIGINT" /> <result column ="user_id" property ="userId" jdbcType ="BIGINT" /> <result column ="product_id" property ="productId" jdbcType ="BIGINT" /> <result column ="count" property ="count" jdbcType ="INTEGER" /> <result column ="money" property ="money" jdbcType ="DECIMAL" /> <result column ="status" property ="status" jdbcType ="INTEGER" /> </resultMap > <insert id ="create" > INSERT INTO `t_order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`) VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0); </insert > <update id ="update" > UPDATE `t_order` SET status = 1 WHERE user_id = #{userId} AND status = #{status}; </update > </mapper >
1 2 3 4 5 6 public interface OrderService { void create (Order order) ; }
OrderServiceImpl :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Resource private StorageService storageService; @Resource private AccountService accountService; @Override public void create (Order order) { log.info("------->下单开始" ); orderDao.create(order); log.info("------->order-service中扣减库存开始" ); storageService.decrease(order.getProductId(),order.getCount()); log.info("------->order-service中扣减库存结束" ); log.info("------->order-service中扣减余额开始" ); accountService.decrease(order.getUserId(),order.getMoney()); log.info("------->order-service中扣减余额结束" ); log.info("------->order-service中修改订单状态开始" ); orderDao.update(order.getUserId(),0 ); log.info("------->order-service中修改订单状态结束" ); log.info("------->下单结束" ); } }
1 2 3 4 5 6 7 8 9 10 @FeignClient(value = "seata-account-service") public interface AccountService { @PostMapping("/account/decrease") public CommonResult decrease (@RequestParam("id") Long userId, @RequestParam("money") BigDecimal money) ; }
1 2 3 4 5 6 7 8 9 @FeignClient(value = "seata-storage-service") public interface StorageService { @PostMapping("/storage/decrease") public CommonResult decrease (@RequestParam("productId") Long productId,@RequestParam("count") Integer count) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RestController @Slf4j public class OrderController { @Resource private OrderService orderService; @GetMapping("/order/create") public CommonResult create (Order order) { orderService.create(order); return new CommonResult (200 ,"订单创建成功!" ); } }
新建库存Storage-Module 1、新建module: seata-storage-service2002
3、yaml:和订单模块Order-Module一致,就修改端口号为2002,数据库url: jdbc:mysql://localhost:3306/seata_storage
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableFeignClients @EnableDiscoveryClient public class SeataStorageServiceApplication2002 { public static void main (String[] args) { SpringApplication.run(SeataStorageServiceApplication2002 .class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Data @AllArgsConstructor @NoArgsConstructor public class CommonResult <T> { private Integer code; private String message; private T data; public CommonResult (Integer code, String message) { this (code, message, null ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Data @AllArgsConstructor @NoArgsConstructor public class Storage { private Long id; private Long productId; private Integer total; private Integer used; private Integer residue; }
1 2 3 4 5 6 7 @Mapper public interface StorageDao { void decrease (@Param("productId") Long productId, @Param("count") Integer count) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace ="com.spongehah.seata.dao.StorageDao" > <resultMap id ="BaseResultMap" type ="com.spongehah.seata.domain.Storage" > <id column ="id" property ="id" jdbcType ="BIGINT" /> <result column ="product_id" property ="productId" jdbcType ="BIGINT" /> <result column ="total" property ="total" jdbcType ="INTEGER" /> <result column ="used" property ="used" jdbcType ="INTEGER" /> <result column ="residue" property ="residue" jdbcType ="INTEGER" /> </resultMap > <update id ="decrease" > UPDATE t_storage SET used = used + #{count}, residue = residue - #{count} WHERE product_id = #{productId} </update > </mapper >
1 2 3 4 5 6 public interface StorageService { void decrease (Long productId, Integer count) ; }
StorageServiceImpl implements StorageService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Service public class StorageServiceImpl implements StorageService { private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class); @Resource private StorageDao storageDao; @Override public void decrease (Long productId, Integer count) { LOGGER.info("------->storage-service中扣减库存开始" ); storageDao.decrease(productId,count); LOGGER.info("------->storage-service中扣减库存结束" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @RestController public class StorageController { @Resource private StorageService storageService; @PostMapping("/storage/decrease") public CommonResult decrease (@RequestParam("productId") Long productId, @RequestParam("count") Integer count) { storageService.decrease(productId, count); return new CommonResult (200 , "扣减库存成功!" ); } }
新建账户Account-Module 1、新建module seata-account-service2003
3、yaml:和订单模块Order-Module一致,就修改端口号为2003,数据库url: jdbc:mysql://localhost:3306/seata_account
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient @EnableFeignClients public class SeataAccountMainApp2003 { public static void main (String[] args) { SpringApplication.run(SeataAccountMainApp2003.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 @Data @AllArgsConstructor @NoArgsConstructor public class CommonResult <T> { private Integer code; private String message; private T data; public CommonResult (Integer code, String message) { this (code, message, null ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Data @AllArgsConstructor @NoArgsConstructor public class Account { private Long id; private Long userId; private BigDecimal total; private BigDecimal used; private BigDecimal residue; }
1 2 3 4 5 6 7 8 @Mapper public interface AccountDao { void decrease (@Param("userId") Long userId, @Param("money") BigDecimal money) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace ="com.spongehah.seata.dao.AccountDao" > <resultMap id ="BaseResultMap" type ="com.spongehah.seata.domain.Account" > <id column ="id" property ="id" jdbcType ="BIGINT" /> <result column ="user_id" property ="userId" jdbcType ="BIGINT" /> <result column ="total" property ="total" jdbcType ="DECIMAL" /> <result column ="used" property ="used" jdbcType ="DECIMAL" /> <result column ="residue" property ="residue" jdbcType ="DECIMAL" /> </resultMap > <update id ="decrease" > UPDATE t_account SET residue = residue - #{money}, used = used + #{money} WHERE user_id = #{userId}; </update > </mapper >
1 2 3 4 5 6 7 8 public interface AccountService { void decrease (@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Service @Slf4j public class AccountServiceImpl implements AccountService { @Resource private AccountDao accountDao; @Override public void decrease (Long userId, BigDecimal money) { log.info("------->account-service中扣减账户余额开始" ); accountDao.decrease(userId,money); log.info("------->account-service中扣减账户余额结束" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @RestController public class AccountController { @Resource private AccountService accountService; @PostMapping("/account/decrease") public CommonResult decrease (@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) { accountService.decrease(userId, money); return new CommonResult (200 , "扣减账户余额成功!" ); } }
测试:@GlobalTransactional的使用 ==注意事项:==
2、mapper.xml中,我是数据库已经设置自增主键,但是还设置了 useGeneratedKeys=”true” keyProperty=”id”,导致idea控制台也一直刷屏,导致即使程序没有异常,所有微服务的表都全部fallback,没有试过数据库没有设置自增主键的情况,若有请自行尝试
1 2 try {TimeUnit.SECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();}
1 2 3 4 5 6 7 8 9 10 @Override @GlobalTransactional(name = "create-order",rollbackFor = Exception.class) public void create (Order order) { log.info("------->下单开始" ); ........ log.info("------->下单结束" ); }
Seata原理补充 Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架
TM 开启分布式事务(TM 向 TC 注册全局事务记录);
按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
TC 汇总事务信息,决定分布式事务是提交还是回滚;
TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。
一阶段加载: 在一阶段,Seata 会拦截“业务 SQL”, 1 解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”, 2 执行“业务 SQL”更新业务数据, 3 在业务数据更新之后,其保存成“after image”,最后生成行锁。 以上操作全部在一个数据库事务内完成 ,这样保证了一阶段操作的原子性。
二阶段如是顺利提交的话, 因为“业务 SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。
二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据 ;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image” , 如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理 。
debug时,可以查看各业务数据库中的undo_log表中的rollback_info字段,里面存有前后镜像 的信息等