最近看了下关于分布式限流的部分,看到Sentinel的分布式限流,也就是集群限流的部分,想搭个环境看看,结果发现网上关于这方面的内容基本可以说没有,你甚至很难跑起来他的demo,就算能跑起来,估计也得自己研究半天,麻烦的要死。
我猜测很重要的原因可能就是Sentinel关于这块做的并不完善,而且从官方的Issue中能看出来,其实官方对于这块后续并没有计划去做的更好。
那么废话不多说,在此之前,肯定要先说下关于Sentinel集群限流方面的原理,没有原理一切都是空中楼阁。
原理这方面比较好解释,就是在原本的限流规则中加了一个clusterMode
参数,如果是true
的话,那么会走集群限流的模式,反之就是单机限流。
如果是集群限流,判断身份是限流客户端还是限流服务端,客户端则和服务端建立通信,所有的限流都通过和服务端的交互来达到效果。
对于Sentinel集群限流,包含两种模式,内嵌式和独立式。
什么是内嵌式呢,简单来说,要限流那么必然要有个服务端去处理多个客户端的限流请求,对于内嵌式来说呢,就是整个微服务集群内部选择一台机器节点作为限流服务端(Sentinel把这个叫做token-server),其他的微服务机器节点作为限流的客户端(token-client),这样的做法有缺点也有优点。
限流-嵌入式
首先说优点:这种方式部署不需要独立部署限流服务端,节省独立部署服务端产生的额外服务器开支,降低部署和维护复杂度。
再说缺点,缺点的话也可以说是整个Sentinel在集群限流这方面做得不够好的问题。
先说第一个缺点:无自动故障转移机制。
无论是内嵌式还是独立式的部署方案,都无法做到自动的故障转移。
所有的server和client都需要事先知道IP的请求下做出配置,如果server挂了,需要手动的修改配置,否则集群限流会退化成单机限流。
比如你的交易服务有3台机器A\B\C,其中A被手动设置为server,B\C则是作为client,当A服务器宕机之后,需要手动修改B\C中一台作为server,否则整个集群的机器都将退化回单机限流的模式。
但是,如果client挂了,则是不会影响到整个集群限流的,比如B挂了,那么A和C将会继续组成集群限流。
如果B再次重启成功,那么又会重新加入到整个集群限流当中来,因为会有一个自动重连的机制,默认的时间是N*2秒,逐渐递增的一个时间。
这是想用Sentinel做集群限流并且使用内嵌式需要考虑的问题,要自己去实现自动故障转移的机制,当然,server节点选举也要自己实现了。
对于这个问题,官方提供了可以修改server/client的API接口,另外一个就是可以基于动态的数据源配置方式,这个我们后面再谈。
第二个缺点:适用于单微服务集群内部限流。
这个其实也是显而易见的道理,都内部选举一台作为server去限流了,如果还跨多个微服务的话,显然是不太合理的行为,现实中这种情况肯定也是非常少见的了,当然你非要想跨多个微服务集群也不是不可以,只要你开心就好。
第三个缺点:server节点的机器性能会受到一定程度的影响。
这个肯定也比较好理解的,作为server去限流,那么其他的客户端肯定要和server去通信才能做到集群限流啊,对不对,所以一定程度上肯定会影响到server节点本身服务的性能,但是我觉得问题不大,就当server节点多了一个流量比较大的接口好了。
具体上会有多大的影响,我没有实际对这块做出实际的测试,如果真的流量非常大,需要实际测试一下这方面的问题。
我认为影响还是可控的,本身server和client基于netty通信,通信的内容其实也非常的小。
说完内嵌式的这些点,然后再说独立式,也非常好理解,就是单独部署一台机器作为限流服务端server,就不在本身微服务集群内部选一台作为server了。
限流-独立式
很明显,优点就是解决了上面的缺点。
优点可以说就是解决了内嵌式的两个缺点,那么缺点也来了,这同样也是Sentinel本身并没有帮助我们去解决的问题。
缺点一:需要独立部署,会产生额外的资源(钱)和运维复杂度
缺点二:server默认是单机,需要自己实现高可用方案
缺点二很致命啊,官方的server实现默认就是单机的,单点问题大家懂的都懂,自己实现高可用,我真的是有点服了。
这么说Sentinel这个集群限流就是简单的实现了一下,真正复杂的部分他都没管,你可以这么理解。
那基本原理大概了解之后,还是要真正跑起来看看效果的,毕竟开头我就说了,网上这方面真的是感觉啥也搜不到,下面以嵌入式集群的方式举例。
无论集群限流还是单机限流的方式,官方都支持写死配置和动态数据源的配置方式,写的话下面的代码中也都有,被我注释掉了,至于动态数据源的配置,会基于Apollo来实现。
理解一下动态数据源的配置方式,基于这个我们可以实现限流规则的动态刷新,还有重点的一点可以做到基于修改配置方式的半自动故障转移。
动态数据源支持推和拉两种方式,比如文件系统和Eureka就是拉取的方式,定时读取文件内容的变更,Eureka则是建立HTTP连接,定时获取元数据的变更。
推送的方式主要是基于事件监听机制,比如Apollo和Nacos,Redis官方则是基于Pub/Sub来实现,默认的实现方式是基于Lettuce,如果想用其他的客户端要自己实现。
限流-集群工作模式
首先,该引入的包还是引入。
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-client-default</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-server-default</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-apollo</artifactId>
<version>1.8.4</version>
</dependency>
实现SPI,在resources
目录的META-INF/services
下新增名为com.alibaba.csp.sentinel.init.InitFunc
的文件,内容写上我们自己实现的类名,比如我的com.irving.demo.init.DemoClusterInitFunc
。
实现InitFunc
接口,重写init
方法,代码直接贴出来,这里整体依赖的是Apollo的配置方式,注释的部分是我在测试的时候写死代码的配置方式,也是可以用的。
public class DemoClusterInitFunc implements InitFunc {
private final String namespace = "application";
private final String ruleKey = "demo_sentinel";
private final String ruleServerKey = "demo_cluster";
private final String defaultRuleValue = "[]";
@Override
public void init() throws Exception {
// 初始化 限流规则
initDynamicRuleProperty();
//初始化 客户端配置
initClientConfigProperty();
// 初始化 服务端配置信息
initClientServerAssignProperty();
registerClusterRuleSupplier();
// token-server的传输规则
initServerTransportConfigProperty();
// 初始化 客户端和服务端状态
initStateProperty();
}
/**
* 限流规则和热点限流规则配置
*/
private void initDynamicRuleProperty() {
ReadableDataSource<String, List<FlowRule>> ruleSource = new ApolloDataSource<>(namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
}));
FlowRuleManager.register2Property(ruleSource.getProperty());
ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new ApolloDataSource<>(namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
}));
ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
}
/**
* 客户端配置,注释的部分是通过Apollo配置,只有一个配置我就省略了
*/
private void initClientConfigProperty() {
// ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(namespace, ruleKey,
// defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {
// }));
// ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
ClusterClientConfig clientConfig = new ClusterClientConfig();
clientConfig.setRequestTimeout(1000);
ClusterClientConfigManager.applyNewConfig(clientConfig);
}
/**
* client->server 传输配置,设置端口号,注释的部分是写死的配置方式
*/
private void initServerTransportConfigProperty() {
ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(namespace, ruleServerKey,
defaultRuleValue, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
});
ServerTransportConfig serverTransportConfig = Optional.ofNullable(groupList)
.flatMap(this::extractServerTransportConfig)
.orElse(null);
return serverTransportConfig;
});
ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
// ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig().setIdleSeconds(600).setPort(transPort));
}
private void registerClusterRuleSupplier() {
ClusterFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
}));
return ds.getProperty();
});
ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
}));
return ds.getProperty();
});
}
/**
* 服务端配置,设置server端口和IP,注释的配置是写死的方式,这个在服务端是不用配置的,只有客户端需要配置用来连接服务端
*/
private void initClientServerAssignProperty() {
ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(namespace, ruleServerKey,
defaultRuleValue, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
});
ClusterClientAssignConfig clusterClientAssignConfig = Optional.ofNullable(groupList)
.flatMap(this::extractClientAssignment)
.orElse(null);
return clusterClientAssignConfig;
});
ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
// ClusterClientAssignConfig serverConfig = new ClusterClientAssignConfig();
// serverConfig.setServerHost("127.0.0.1");
// serverConfig.setServerPort(transPort);
// ConfigSupplierRegistry.setNamespaceSupplier(() -> "trade-center");
// ClusterClientConfigManager.applyNewAssignConfig(serverConfig);
}
private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupEntity> groupList) {
ClusterGroupEntity tokenServer = groupList.stream().filter(x -> x.getState().equals(ClusterStateManager.CLUSTER_SERVER)).findFirst().get();
Integer currentMachineState = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
if (currentMachineState.equals(ClusterStateManager.CLUSTER_CLIENT)) {
String ip = tokenServer.getIp();
Integer port = tokenServer.getPort();
return Optional.of(new ClusterClientAssignConfig(ip, port));
}
return Optional.empty();
}
/**
* 初始化客户端和服务端状态,注释的也是写死的配置方式
*/
private void initStateProperty() {
ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(namespace, ruleServerKey,
defaultRuleValue, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
});
Integer state = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
return state;
});
ClusterStateManager.registerProperty(clusterModeDs.getProperty());
// ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER);
}
private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
return groupList.stream()
.filter(x -> x.getMachineId().equalsIgnoreCase(getCurrentMachineId()) && x.getState().equals(ClusterStateManager.CLUSTER_SERVER))
.findAny()
.map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
}
private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
return getCurrentMachineId().equals(group.getMachineId());
}
private String getCurrentMachineId() {
// 通过-Dcsp.sentinel.api.port=8719 配置, 默认8719,随后递增
return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
}
private static final String SEPARATOR = "@";
}
基础类,定义配置的基础信息。
@Data
public class ClusterGroupEntity {
private String machineId;
private String ip;
private Integer port;
private Integer state;
}
然后是Apollo中的限流规则的配置和server/client集群关系的配置。
需要说明一下的就是flowId
,这个是区分限流规则的全局唯一ID,必须要有,否则集群限流会有问题。
thresholdType
代表限流模式,默认是0,代表单机均摊,比如这里count
限流QPS=20,有3台机器,那么集群限流阈值就是60,如果是1代表全局阈值,也就是count
配置的值就是集群限流的上限。
demo_sentinel=[
{
"resource": "test_res", //限流资源名
"count": 20, //集群限流QPS
"clusterMode": true, //true为集群限流模式
"clusterConfig": {
"flowId": 111, //这个必须得有,否则会有问题
"thresholdType": 1 //限流模式,默认为0单机均摊,1是全局阈值
}
}
]
demo_cluster=[
{
"ip": "192.168.3.20",
"machineId": "192.168.3.20@8720",
"port": 9999, //server和client通信接口
"state": 1 //指定为server
},
{
"ip": "192.168.3.20",
"machineId": "192.168.3.20@8721",
"state": 0
},
{
"ip": "192.168.3.20",
"machineId": "192.168.3.20@8722",
"state": 0
}
]
OK,到这里代码和配置都已经OK,还需要跑起来Sentinel控制台,这个不用教,还有启动参数。
本地可以直接跑多个客户端,注意修改端口号:-Dserver.port=9100 -Dcsp.sentinel.api.port=8720
这两个一块改,至于怎么连Apollo这块我就省略了,自己整吧,公司应该都有,不行的话用代码里的写死的方式也可以用。
-Dserver.port=9100 -Dcsp.sentinel.api.port=8720 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.log.use.pid=true
因为有流量之后控制台才能看到限流的情况,所以用官方给的限流测试代码修改一下,放到Springboot启动类中,触发限流规则的初始化。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
new FlowQpsDemo();
}
}
测试限流代码:
public class FlowQpsDemo {
private static final String KEY = "test_res";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 32;
private static int seconds = 60 + 40;
public FlowQpsDemo() {
tick();
simulateTraffic();
}
private static void simulateTraffic() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " send qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
// stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
static class RunTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;
try {
entry = SphU.entry(KEY);
// token acquired, means pass
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
Random random2 = new Random();
try {
TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
} catch (InterruptedException e) {
// ignore
}
}
}
}
}
启动之后查看控制台,可以看到嵌入式的集群服务端已经启动好。
查看限流的情况:
最后为了测试效果,再启动一个客户端,修改端口号为9200和8721,可以看到新的客户端已经连接到了服务端,不过这里显示的总QPS 30000和我们配置的不符,这个不用管他。
好了,这个就是集群限流原理和使用配置方式,当然了,你可以启动多台服务,然后手动修改Apollo中的state
参数修改服务端,验证修改配置的方式是否能实现故障转移机制,另外就是关闭client或者server验证是否回退到单机限流的情况,这里就不一一测试了,因为我已经测试过了呀。
对于独立式的部署方式基本也是一样的,只是单独启动一个服务端的服务,需要手动配置server,而嵌入式的则不需要,loadServerNamespaceSet
配置为自己的服务名称即可。
ClusterTokenServer tokenServer = new SentinelDefaultTokenServer();
ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig()
.setIdleSeconds(600)
.setPort(11111));
ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton(DemoConstants.APP_NAME));
tokenServer.start();
OK,这就是本期的所有内容。
- END -
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/D6zaWad0SqPJIH1SOE7aQw
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。