时间滚动代码(滚动时间窗口法)
写在前面,如果喜欢每周分享的干货内容,请留下一个宝贵的赞并且分享给别人,谢谢!
涉及概念
服务等级(service-level):
核心(core)
重要(important)
普通(normal)
次要(secondary)
非必需(dispensable)
服务隔离
消费者的每个消费的服务之间互相独立,互不影响,不会因为某个服务的故障或者不可用造成其他服务的故障或者不可用。
隔离策略
线程池隔离: 使用线程池作为隔离的实现方式,每个隔离单元拥有自己单独的线程池,调用依赖服务时,申请一个新的线程执行真正的调用逻辑,线程池或者队列满了之后,拒绝服务。
信号量隔离: 使用信号量作为隔离的实现方式,每个隔离单元拥有配置了自己的信号量阈值,调用依赖服务时,在原请求线程中申请新的信号量,如果申请到,继续在原线程中执行调用逻辑,信号量超过阈值之后,拒绝服务。
服务限流
按照服务隔离的原则,对每个服务的流量进行限制,不会因为某个或某几个服务的请求量过大而造成其他服务的不可用
展开全文
服务熔断
当消费方依赖的某个服务不可用时,动态的隔绝对该服务的依赖。消费方不再继续请求该服务,尝试使用降级逻辑。当服务恢复可用时,能立即感知并恢复对该服务的依赖。
服务降级
消费方依赖的某个服务不可用(异常或者超时),需要采取的补偿性措施。
dubbo与hystrix比较
dubbo的限流,降级方案
消费端通过配置acitves限制消费端调用的并发量,在达到最大并发量之后等待一个timeout时间再重试。
服务端通过配置executes限制服务端接口的线程最大数量,达到最大数量之后直接抛出异常。
超时配置,当超时且超过重试次数之后,抛出异常。消费方实现自己的降级逻辑。
当没有可用的服务提供者之后,消费者直接短路,消费方实现自己的短路逻辑。
通过注册中心的URL实现服务运行时参数的动态配置。
限流或隔离的粒度是以接口方法为粒度。
dubbo自带的监控不够强大,需要自己扩展或者使用第三方扩展。
hystrix的限流,降级方案
自定义限流位置。
提供超时时间配置,当超时或者抛出非BadRequestException之后,其他任何错误,异常或者超时时,尝试降级逻辑。
对一段时间内的错误,超时率进行统计,达到配置的阈值时自动短路,调用降级逻辑。
服务短路后提供自动恢复机制,快速恢复服务。
通过内置的archaius或者第三方配置框架实现服务运行时参数的动态配置。
隔离粒度可以自定义,模块,接口,方法粒度都支持。
提供了基于event-stream的扩展工具和官方的dashboard进行监控。但目前官方提供的even-stream是基于servlet的
两种方案的优劣
dubbo同时提供消费端和服务端的限流。hystrix只提供消费端限流。
dubbo的消费端限流的信号量是以服务器为粒度,而hystrix的消费端限流是以整个提供方集群为粒度(更合理)。
dubbo不提供服务容错降级后的自动短路。hystrix支持自动短路和自动恢复。
dubbo管理平台中的动态配置用通知的方式通知消费者,但存在不生效等一些bug。hystrix利用archaius的动态配置方案从本地或URL中轮询拉取配置。
dubbo的限流其实是基于信号量的,而hystrix同时支持信号量和线程池的限流。
hystrix与dubbo集成的方案
实现方式
在dubbo的消费端利用dubbo的filter对所有调用进行拦截扩展代码如下: DubboHystrixFilter.java
public class DubboHystrixFilter implements Filter {
@Override
public Result invoke(Invoker? invoker, Invocation invocation) throws RpcException {
// 是否启用hystrix
if (!hystrixIsOpen) {
return invoker.invoke(invocation);
}
String group = invoker.getUrl().getParameter(HystrixConstants.GROUP_KEY);
URL url = invoker.getUrl();
// 未配置groupKey的接口不进行限流
if (StringUtils.isBlank(group)) {
group = invoker.getUrl().getParameter(Constants.ID_KEY);
}
// int serviceLevel = invoker.getUrl().getParameter(HystrixConstants.SERVICE_LEVEL_KEY, ServiceLevelEnum.NORMAL.getLevel());
DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation, group);
return command.execute();
}
}
public class DubboHystrixFilter implements Filter {
@Override
public Result invoke(Invoker? invoker, Invocation invocation) throws RpcException {
// 是否启用hystrix
if (!hystrixIsOpen) {
return invoker.invoke(invocation);
}
String group = invoker.getUrl().getParameter(HystrixConstants.GROUP_KEY);
URL url = invoker.getUrl();
// 未配置groupKey的接口不进行限流
if (StringUtils.isBlank(group)) {
group = invoker.getUrl().getParameter(Constants.ID_KEY);
}
// int serviceLevel = invoker.getUrl().getParameter(HystrixConstants.SERVICE_LEVEL_KEY, ServiceLevelEnum.NORMAL.getLevel());
DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation, group);
return command.execute();
}
}
DubboHystrixCommand.java
public class DubboHystrixCommand extends HystrixCommandResult
{ // 统计一定时间内成功请求数
private static final int STATUSTIME = 20000; // 用于计算百分比的滚动窗口时间长度(毫秒)
private static final int ROLLINGTIME = 60000; private Invoker? invoker; private Invocation invocation; public DubboHystrixCommand(Invoker? invoker, Invocation invocation, String group) { // 使用dubbo配置的优先级 method interface application 同等级别 consumer provider
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group)) // 组名使用模块名称
// 服务等级为NORMAL的隔离粒度为模块,其他服务等级的隔离粒度为接口
.andCommandKey(HystrixCommandKey.Factory.asKey(group))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withMetricsRollingPercentileWindowInMilliseconds(ROLLINGTIME)
.withMetricsRollingStatisticalWindowInMilliseconds(STATUSTIME) // 使用信号量隔离的方式
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
// 最大并发量,配置的优先级为
.withExecutionIsolationSemaphoreMaxConcurrentRequests(invoker.getUrl().getParameter(
HystrixConstants.CONCURRENCY_KEY, HystrixConstants.DEFAULT_MAX_CONCURRENCY))
.withExecutionTimeoutEnabled(false) // 是否开启熔断功能
.withCircuitBreakerEnabled(true)
)); this.invoker = invoker; this.invocation = invocation;
}
@Override protected Result run() throws Exception
{ return invoker.invoke(invocation);
}
@Override protected Result getFallback() { if (executionResult.isResponseSemaphoreRejected()) {
MapString, Object map = new HashMapString, Object(); map.put("resultCode", DHAPCode.COM_FLOW_OVERRUN.getCode()); map.put("resultMsg", DHAPCode.COM_FLOW_OVERRUN.getMsg());
Result result = new RpcResult(map); return result;
}
MapString, Object map = new HashMapString, Object(); map.put("resultCode", DHAPCode.COM_SERVER_ERROR.getCode()); map.put("resultMsg", DHAPCode.COM_SERVER_ERROR.getMsg());
Result result = new RpcResult(map); return result;
}
}
public class DubboHystrixCommand extends HystrixCommandResult
{ // 统计一定时间内成功请求数
private static final int STATUSTIME = 20000; // 用于计算百分比的滚动窗口时间长度(毫秒)
private static final int ROLLINGTIME = 60000; private Invoker? invoker; private Invocation invocation; public DubboHystrixCommand(Invoker? invoker, Invocation invocation, String group) { // 使用dubbo配置的优先级 method interface application 同等级别 consumer provider
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group)) // 组名使用模块名称
// 服务等级为NORMAL的隔离粒度为模块,其他服务等级的隔离粒度为接口
.andCommandKey(HystrixCommandKey.Factory.asKey(group))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withMetricsRollingPercentileWindowInMilliseconds(ROLLINGTIME)
.withMetricsRollingStatisticalWindowInMilliseconds(STATUSTIME) // 使用信号量隔离的方式
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
// 最大并发量,配置的优先级为
.withExecutionIsolationSemaphoreMaxConcurrentRequests(invoker.getUrl().getParameter(
HystrixConstants.CONCURRENCY_KEY, HystrixConstants.DEFAULT_MAX_CONCURRENCY))
.withExecutionTimeoutEnabled(false) // 是否开启熔断功能
.withCircuitBreakerEnabled(true)
)); this.invoker = invoker; this.invocation = invocation;
}
@Override protected Result run() throws Exception
{ return invoker.invoke(invocation);
}
@Override protected Result getFallback() { if (executionResult.isResponseSemaphoreRejected()) {
MapString, Object map = new HashMapString, Object(); map.put("resultCode", DHAPCode.COM_FLOW_OVERRUN.getCode()); map.put("resultMsg", DHAPCode.COM_FLOW_OVERRUN.getMsg());
Result result = new RpcResult(map); return result;
}
MapString, Object map = new HashMapString, Object(); map.put("resultCode", DHAPCode.COM_SERVER_ERROR.getCode()); map.put("resultMsg", DHAPCode.COM_SERVER_ERROR.getMsg());
Result result = new RpcResult(map); return result;
}
}
配置
xmlns:hystrix="http://www.springframework.org/schema/p"dubbo:consumer timeout="60000" check="false" filter="hystrixFilter,consumerLogFilter" hystrix:concurrency="40" /
dubbo:reference id="getUserInfoByIdService" interface="com.cmiot.ums.api.user.GetUserInfoByIdService" hystrix:hgroup="ums" /
xmlns:hystrix="http://www.springframework.org/schema/p"dubbo:consumer timeout="60000" check="false" filter="hystrixFilter,consumerLogFilter" hystrix:concurrency="40" /
dubbo:reference id="getUserInfoByIdService" interface="com.cmiot.ums.api.user.GetUserInfoByIdService" hystrix:hgroup="ums" /
HystrixCommand和HystrixObservableCommand
HystrixObservableCommand只支持异步的调用方式,HystrixCommand同步异步都支持。
HystrixObservableCommand支持请求合并功能,HystrixCommand不支持。
隔离粒度
对于未配置hystrix:hgroup的消费者不进行限流和熔断,对于配置了hystrix:hgroup的消费方,默认的最大并发量为40,隔离粒度根据配置自定义。考虑引入服务等级的概念,对于重要的服务默认采用接口级别的隔离粒度,对于非重要框架,每个模块的每个等级进行隔离,实现对每个服务等级进行动态调整。当服务器资源不够用时,可临时限制或关闭非核心服务的功能。
动态配置
尝试了使用单独的配置文件去管理hystrix的配置,但由于我们需要使用dubbo的url中的参数对服务进行分组,因此如果用独立的配置文件,配置会比较分散,不易于维护。因此仍然利用dubbo的配置,dubbo的配置也有很多方式,在消费端配置或者在服务端配置,用单一的注册中心配置还是分模块的多注册中心配置。最终我们仍然决定用在消费端的单一注册中心配置。理由如下:
服务治理上,服务消费方更清楚服务的使用场景,包括并发量,重要性等,如何降级,容错等等。因此,配置在服务消费方比在提供方更合理。
为了解决在服务消费方无法对所使用的服务进行逻辑上的分组,方便分组的统一配置,曾考虑使用多注册中心的方式,按照每个模块使用单独的注册中配置,但是需要每个服务提供方都去修改注册中心,改动较大,暂时不采用。
当隔离粒度为模块时,如果需要变更模块的配置,目前不太方便,需要对消费方这个分组内的所有接口配置,并且可能由于配置时的疏忽造成某个接口与其他接口的配置不同。
hystrix-dashboard
由于hystrix原生的event-stream是基于servlet容器的,应用平台未使用基于servlet容器的方案,因此对event-stream进行了扩展,方便对接口运行状况进行实时监控统计。 注意hystrix-dashborad在监控过程中的请求会被handle住,因此需要配置最大连接数
隔离策略
线程池策略比较信号量的优势是能够以非阻塞的方式进行调用,并且通过对单个接口的压测显示性能稍好于信号量。同时,线程池的方式支持缓冲队列。
信号量比较线程池策略的优势是:相比较动态调整大小的开销比较小,经过对单个接口的测试,对CPU的消耗比线程池小。
两种方式混用
hystrix关键配置
组名(commandGroupKey)
用于统计报表,通知,仪表盘或服务归属之类的,使用应用平台模块名称作为commandGroupKey。
命令名(commandKey)
用于监控,熔断器开关,缓存等作用,也是比较关键的,决定了隔离的粒度,缺省默认使用类名作为key,即根据当前类名作为隔离粒度,但由于集成采用公用的dubbofilter的机制,所有的名称都一样,应用平台根据服务等级,大于normal等级的使用接口作为隔离粒度。其他使用模块级别的隔离粒度。
命令配置(commandProperties)
包含隔离策略配置,线程池大小,信号量大小,超时配置,熔断功能配置,降级配置,监控配置等关键配置信息,每个隔离的单元使用独立的配置。
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group)) // 组名使用模块名称
// 服务等级为NORMAL的隔离粒度为模块,其他服务等级的隔离粒度为接口
.andCommandKey(HystrixCommandKey.Factory.asKey(group))
// 通用配置
.andCommandPropertiesDefaults( HystrixCommandProperties.Setter()
// 是否开启熔断
.withCircuitBreakerEnabled(true)
// 触发熔断的错误率
.withCircuitBreakerErrorThresholdPercentage(50)
// 强制关闭熔断器
.withCircuitBreakerForceClosed(false)
// 强制打开熔断器
.withCircuitBreakerForceOpen(false)
// 触发熔断器需要的请求量
.withCircuitBreakerRequestVolumeThreshold(20)
// 熔断器从打开到半开的等待时间
.withCircuitBreakerSleepWindowInMilliseconds(5000)
// 使用信号量隔离的方式 默认:线程池
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
// 信号量阈值 默认:10
.withExecutionIsolationSemaphoreMaxConcurrentRequests(invoker.getUrl().getParameter(HystrixConstants.CONCURRENCY_KEY, HystrixConstants.DEFAULT_MAX_CONCURRENCY))
// // 是否开启超时时间中断抛出异常的功能
.withExecutionTimeoutEnabled(false)
// 超时后是否中断线程
// .withExecutionIsolationThreadInterruptOnTimeout(true)
// // 超时时间 默认:1000// .withExecutionTimeoutInMilliseconds(invoker.getUrl().getParameter(Constants.TIMEOUT_KEY, HystrixConstants.DEFAULT_TIMEOUT_MILLSECOND))
// 是否开启降级
.withFallbackEnabled(true)
// 信号量隔离时,允许请求降级的最大并发数
.withFallbackIsolationSemaphoreMaxConcurrentRequests(10)
// 计算错误率的间隔时间
.withMetricsHealthSnapshotIntervalInMilliseconds(500)
// 设置每个bucket内执行的次数,如果超过这个次数,丢弃最早的,加入最新的
.withMetricsRollingPercentileBucketSize(100)
// 是否开启监控统计功能,如果设置false,任何统计都返回-1
.withMetricsRollingPercentileEnabled(true)
// 用于计算百分比的滚动窗口内buckets的个数
.withMetricsRollingPercentileWindowBuckets(6)
// 用于计算百分比的滚动窗口时间长度
.withMetricsRollingPercentileWindowInMilliseconds(60000)
// 可统计的滚动窗口内的buckets数量,用于熔断器和指标发布
.withMetricsRollingStatisticalWindowBuckets(10)
// 可统计的滚动窗口的时间长度,这段时间内的执行数据用于熔断器和指标发布
.withMetricsRollingStatisticalWindowInMilliseconds(10000)
// 是否开启缓存
.withRequestCacheEnabled(true)
// 是否开启日志
.withRequestLogEnabled(true)
)
// 线程池策略时的配置
// .andThreadPoolPropertiesDefaults(// HystrixThreadPoolProperties.Setter()
// .withCoreSize(10)
// .withKeepAliveTimeMinutes(5)
// .withMaxQueueSize(-1)
// .withMetricsRollingStatisticalWindowBuckets(10)
// .withMetricsRollingStatisticalWindowInMilliseconds(10)
// .withQueueSizeRejectionThreshold(20)
// )
);
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group)) // 组名使用模块名称
// 服务等级为NORMAL的隔离粒度为模块,其他服务等级的隔离粒度为接口
.andCommandKey(HystrixCommandKey.Factory.asKey(group))
// 通用配置
.andCommandPropertiesDefaults( HystrixCommandProperties.Setter()
// 是否开启熔断
.withCircuitBreakerEnabled(true)
// 触发熔断的错误率
.withCircuitBreakerErrorThresholdPercentage(50)
// 强制关闭熔断器
.withCircuitBreakerForceClosed(false)
// 强制打开熔断器
.withCircuitBreakerForceOpen(false)
// 触发熔断器需要的请求量
.withCircuitBreakerRequestVolumeThreshold(20)
// 熔断器从打开到半开的等待时间
.withCircuitBreakerSleepWindowInMilliseconds(5000)
// 使用信号量隔离的方式 默认:线程池
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
// 信号量阈值 默认:10
.withExecutionIsolationSemaphoreMaxConcurrentRequests(invoker.getUrl().getParameter(HystrixConstants.CONCURRENCY_KEY, HystrixConstants.DEFAULT_MAX_CONCURRENCY))
// // 是否开启超时时间中断抛出异常的功能
.withExecutionTimeoutEnabled(false)
// 超时后是否中断线程
// .withExecutionIsolationThreadInterruptOnTimeout(true)
// // 超时时间 默认:1000// .withExecutionTimeoutInMilliseconds(invoker.getUrl().getParameter(Constants.TIMEOUT_KEY, HystrixConstants.DEFAULT_TIMEOUT_MILLSECOND))
// 是否开启降级
.withFallbackEnabled(true)
// 信号量隔离时,允许请求降级的最大并发数
.withFallbackIsolationSemaphoreMaxConcurrentRequests(10)
// 计算错误率的间隔时间
.withMetricsHealthSnapshotIntervalInMilliseconds(500)
// 设置每个bucket内执行的次数,如果超过这个次数,丢弃最早的,加入最新的
.withMetricsRollingPercentileBucketSize(100)
// 是否开启监控统计功能,如果设置false,任何统计都返回-1
.withMetricsRollingPercentileEnabled(true)
// 用于计算百分比的滚动窗口内buckets的个数
.withMetricsRollingPercentileWindowBuckets(6)
// 用于计算百分比的滚动窗口时间长度
.withMetricsRollingPercentileWindowInMilliseconds(60000)
// 可统计的滚动窗口内的buckets数量,用于熔断器和指标发布
.withMetricsRollingStatisticalWindowBuckets(10)
// 可统计的滚动窗口的时间长度,这段时间内的执行数据用于熔断器和指标发布
.withMetricsRollingStatisticalWindowInMilliseconds(10000)
// 是否开启缓存
.withRequestCacheEnabled(true)
// 是否开启日志
.withRequestLogEnabled(true)
)
// 线程池策略时的配置
// .andThreadPoolPropertiesDefaults(// HystrixThreadPoolProperties.Setter()
// .withCoreSize(10)
// .withKeepAliveTimeMinutes(5)
// .withMaxQueueSize(-1)
// .withMetricsRollingStatisticalWindowBuckets(10)
// .withMetricsRollingStatisticalWindowInMilliseconds(10)
// .withQueueSizeRejectionThreshold(20)
// )
);
springMVC与Hystrix
为每个url或者servlet实现单独的HystrixCommand,来达到隔离,限流,熔断,降级的目的
在servlet中的公共filter中实现
在调用内部或者外部服务时用aop的方式实现。
hystrix依赖
hystrix-core中只有三个依赖
archaius-core,用来实现动态配置,支持扩展第三方工具
rxjava,是hystrix最核心的一部分,是实现异步调用的核心,基于观察者模式的扩展,支持基于jvm的语言,例如:scala,groovy等。
HdrHistogram,在hystrix运行时对相关的metrixs进行收集,支持扩展第三方工具