最近在github上看了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台。
网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:
Servlet3已经支持异步,这种方案使用比较多,京东,有赞和Zuul,都用的是这种方案。
Netty为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己处理HTTP协议,这一块比较麻烦。
后面发现Soul网关是基于Spring WebFlux(底层Netty)的,不用太关心HTTP协议的处理,于是决定也用Spring WebFlux。
网关的第二个特点是具备可扩展性,比如Netflix Zuul有preFilters,postFilters等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。
在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。
现在流行的注册中心有Apache的Zookeeper和阿里的Nacos两种(consul有点小众),因为之前写RPC框架时已经用过了Zookeeper,所以这次就选择了Nacos。
首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:
自定义路由规则
可基于version的路由规则设置,路由对象包括DEFAUL,HEADER和QUERY三种,匹配方式包括=、regex、like三种。
跨语言
HTTP协议天生跨语言
高性能
Netty本身就是一款高性能的通信框架,同时server将一些路由规则等数据缓存到JVM内存避免请求admin服务。
高可用
支持集群模式防止单节点故障,无状态。
灰度发布
灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。通过特性一可以实现。
接口鉴权
基于责任链模式,用户开发自己的鉴权插件即可。
负载均衡
支持多种负载均衡算法,如随机,轮询,加权轮询等。利用SPI机制可以根据配置进行动态加载。
在参考了一些优秀的网关Zuul,Spring Cloud Gateway,Soul后,将项目划分为以下几个模块。
它们之间的关系如图:
网关设计
注意: 这张图与实际实现有点出入,Nacos push到本地缓存的那个环节没有实现,目前只有ship-sever定时轮询pull的过程。ship-admin从Nacos获取注册服务信息的过程,也改成了ServiceA启动时主动发生HTTP请求通知ship-admin。
3.1 ship-client-spring-boot-starter
首先创建一个spring-boot-starter命名为ship-client-spring-boot-starter,不知道如何自定义starter的可以看我以前写的《开发自己的starter》。
其核心类 AutoRegisterListener 就是在项目启动时做了两件事:
1.将服务信息注册到Nacos注册中心
2.通知ship-admin服务上线了并注册下线hook。
代码如下:
* Created by 2YSP on 2020/12/21
*/
public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);
private volatile AtomicBoolean registered = new AtomicBoolean(false);
private final ClientConfigProperties properties;
@NacosInjected
private NamingService namingService;
@Autowired
private RequestMappingHandlerMapping handlerMapping;
private final ExecutorService pool;
/**
* url list to ignore
*/
private static List<String> ignoreUrlList = new LinkedList<>();
static {
ignoreUrlList.add("/error");
}
public AutoRegisterListener(ClientConfigProperties properties) {
if (!check(properties)) {
LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");
throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");
}
this.properties = properties;
pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
/**
* check the ClientConfigProperties
*
* @param properties
* @return
*/
private boolean check(ClientConfigProperties properties) {
if (properties.getPort() == null| properties.getContextPath() == null
| properties.getVersion() == null| properties.getAppName() == null
| properties.getAdminUrl() == null) {
return false;
}
return true;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!registered.compareAndSet(false, true)) {
return;
}
doRegister();
registerShutDownHook();
}
/**
* send unregister request to admin when jvm shutdown
*/
private void registerShutDownHook() {
final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
unregisterAppDTO.setAppName(properties.getAppName());
unregisterAppDTO.setVersion(properties.getVersion());
unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
unregisterAppDTO.setPort(properties.getPort());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
OkhttpTool.doPost(url, unregisterAppDTO);
LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
}));
}
/**
* register all interface info to register center
*/
private void doRegister() {
Instance instance = new Instance();
instance.setIp(IpUtil.getLocalIpAddress());
instance.setPort(properties.getPort());
instance.setEphemeral(true);
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("version", properties.getVersion());
metadataMap.put("appName", properties.getAppName());
instance.setMetadata(metadataMap);
try {
namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
} catch (NacosException e) {
LOGGER.error("register to nacos fail", e);
throw new ShipException(e.getErrCode(), e.getErrMsg());
}
LOGGER.info("register interface info to nacos success!");
// send register request to ship-admin
String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
OkhttpTool.doPost(url, registerAppDTO);
LOGGER.info("register to ship-admin success!");
}
private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
RegisterAppDTO registerAppDTO = new RegisterAppDTO();
registerAppDTO.setAppName(properties.getAppName());
registerAppDTO.setContextPath(properties.getContextPath());
registerAppDTO.setIp(instance.getIp());
registerAppDTO.setPort(instance.getPort());
registerAppDTO.setVersion(properties.getVersion());
return registerAppDTO;
}
}
3.2 ship-server
ship-sever项目主要包括了两个部分内容, 1.请求动态路由的主流程 2.本地缓存数据和ship-admin及nacos同步,这部分在后面3.3再讲。微信搜索公众号 逆锋起笔,关注后回复 编程资源,领取各种经典学习资料。
ship-server实现动态路由的原理是利用WebFilter拦截请求,然后将请求教给plugin chain去链式处理。
PluginFilter根据URL解析出appName,然后将启用的plugin组装成plugin chain。
public class PluginFilter implements WebFilter {
private ServerConfigProperties properties;
public PluginFilter(ServerConfigProperties properties) {
this.properties = properties;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String appName = parseAppName(exchange);
if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
}
PluginChain pluginChain = new PluginChain(properties, appName);
pluginChain.addPlugin(new DynamicRoutePlugin(properties));
pluginChain.addPlugin(new AuthPlugin(properties));
return pluginChain.execute(exchange, pluginChain);
}
private String parseAppName(ServerWebExchange exchange) {
RequestPath path = exchange.getRequest().getPath();
String appName = path.value().split("/")[1];
return appName;
}
}```
PluginChain继承了AbstractShipPlugin并持有所有要执行的插件。
```java
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/25
*/
public class PluginChain extends AbstractShipPlugin {
/**
* the pos point to current plugin
*/
private int pos;
/**
* the plugins of chain
*/
private List<ShipPlugin> plugins;
private final String appName;
public PluginChain(ServerConfigProperties properties, String appName) {
super(properties);
this.appName = appName;
}
/**
* add enabled plugin to chain
*
* @param shipPlugin
*/
public void addPlugin(ShipPlugin shipPlugin) {
if (plugins == null) {
plugins = new ArrayList<>();
}
if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
return;
}
plugins.add(shipPlugin);
// order by the plugin's order
plugins.sort(Comparator.comparing(ShipPlugin::order));
}
@Override
public Integer order() {
return null;
}
@Override
public String name() {
return null;
}
@Override
public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
if (pos == plugins.size()) {
return exchange.getResponse().setComplete();
}
return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
}
public String getAppName() {
return appName;
}
}
AbstractShipPlugin实现了ShipPlugin接口,并持有ServerConfigProperties配置对象。
public abstract class AbstractShipPlugin implements ShipPlugin {
protected ServerConfigProperties properties;
public AbstractShipPlugin(ServerConfigProperties properties) {
this.properties = properties;
}
}```
ShipPlugin接口定义了所有插件必须实现的三个方法order(),name()和execute()。
```java
public interface ShipPlugin {
/**
* lower values have higher priority
*
* @return
*/
Integer order();
/**
* return current plugin name
*
* @return
*/
String name();
Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);
}```
DynamicRoutePlugin继承了抽象类AbstractShipPlugin,包含了动态路由的主要业务逻辑。
```java
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/25
*/
public class DynamicRoutePlugin extends AbstractShipPlugin {
private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);
private static WebClient webClient;
private static final Gson gson = new GsonBuilder().create();
static {
HttpClient httpClient = HttpClient.create()
.tcpConfiguration(client ->
client.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(3))
.addHandlerLast(new WriteTimeoutHandler(3)))
.option(ChannelOption.TCP_NODELAY, true)
);
webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
public DynamicRoutePlugin(ServerConfigProperties properties) {
super(properties);
}
@Override
public Integer order() {
return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
}
@Override
public String name() {
return ShipPluginEnum.DYNAMIC_ROUTE.getName();
}
@Override
public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
String appName = pluginChain.getAppName();
ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
// LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));
// request service
String url = buildUrl(exchange, serviceInstance);
return forward(exchange, url);
}
/**
* forward request to backend service
*
* @param exchange
* @param url
* @return
*/
private Mono<Void> forward(ServerWebExchange exchange, String url) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpMethod method = request.getMethod();
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
headers.addAll(request.getHeaders());
});
WebClient.RequestHeadersSpec<?> reqHeadersSpec;
if (requireHttpBody(method)) {
reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
} else {
reqHeadersSpec = requestBodySpec;
}
// nio->callback->nio
return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
.onErrorResume(ex -> {
return Mono.defer(() -> {
String errorResultJson = "";
if (ex instanceof TimeoutException) {
errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";
} else {
errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";
}
return ShipResponseUtil.doResponse(exchange, errorResultJson);
}).then(Mono.empty());
}).flatMap(backendResponse -> {
response.setStatusCode(backendResponse.statusCode());
response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
});
}
/**
* weather the http method need http body
*
* @param method
* @return
*/
private boolean requireHttpBody(HttpMethod method) {
if (method.equals(HttpMethod.POST)| method.equals(HttpMethod.PUT)| method.equals(HttpMethod.PATCH)) {
return true;
}
return false;
}
private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
ServerHttpRequest request = exchange.getRequest();
String query = request.getURI().getQuery();
String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");
String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;
if (!StringUtils.isEmpty(query)) {
url = url + "?" + query;
}
return url;
}
/**
* choose an ServiceInstance according to route rule config and load balancing algorithm
*
* @param appName
* @param request
* @return
*/
private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);
if (CollectionUtils.isEmpty(serviceInstances)) {
LOGGER.error("service instance of {} not find", appName);
throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
}
String version = matchAppVersion(appName, request);
if (StringUtils.isEmpty(version)) {
throw new ShipException("match app version error");
}
// filter serviceInstances by version
List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
//Select an instance based on the load balancing algorithm
LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
return serviceInstance;
}
private String matchAppVersion(String appName, ServerHttpRequest request) {
List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);
rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
for (AppRuleDTO rule : rules) {
if (match(rule, request)) {
return rule.getVersion();
}
}
return null;
}
private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
String matchObject = rule.getMatchObject();
String matchKey = rule.getMatchKey();
String matchRule = rule.getMatchRule();
Byte matchMethod = rule.getMatchMethod();
if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
return true;
} else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
String param = request.getQueryParams().getFirst(matchKey);
if (!StringUtils.isEmpty(param)) {
return StringTools.match(param, matchMethod, matchRule);
}
} else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
HttpHeaders headers = request.getHeaders();
String headerValue = headers.getFirst(matchKey);
if (!StringUtils.isEmpty(headerValue)) {
return StringTools.match(headerValue, matchMethod, matchRule);
}
}
return false;
}
}
app数据同步
后台服务(如订单服务)启动时,只将服务名,版本,ip地址和端口号注册到了Nacos,并没有实例的权重和启用的插件信息怎么办?
一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要ship-admin定时更新实例的权重和插件信息到注册中心。
对应代码ship-admin的NacosSyncListener
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/30
*/
@Configuration
public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);
private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
new ShipThreadFactory("nacos-sync", true).create());
@NacosInjected
private NamingService namingService;
@Value("${nacos.discovery.server-addr}")
private String baseUrl;
@Resource
private AppService appService;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() != null) {
return;
}
String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS);
}
class NacosSyncTask implements Runnable {
private NamingService namingService;
private String url;
private AppService appService;
private Gson gson = new GsonBuilder().create();
public NacosSyncTask(NamingService namingService, String url, AppService appService) {
this.namingService = namingService;
this.url = url;
this.appService = appService;
}
/**
* Regular update weight,enabled plugins to nacos instance
*/
@Override
public void run() {
try {
// get all app names
ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
if (CollectionUtils.isEmpty(services.getData())) {
return;
}
List<String> appNames = services.getData();
List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
for (AppInfoDTO appInfo : appInfos) {
if (CollectionUtils.isEmpty(appInfo.getInstances())) {
continue;
}
for (ServiceInstance instance : appInfo.getInstances()) {
Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
String resp = OkhttpTool.doPut(url, queryMap, "");
LOGGER.debug("response :{}", resp);
}
}
} catch (Exception e) {
LOGGER.error("nacos sync task error", e);
}
}
private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
Map<String, Object> map = new HashMap<>();
map.put("serviceName", appInfo.getAppName());
map.put("groupName", NacosConstants.APP_GROUP_NAME);
map.put("ip", instance.getIp());
map.put("port", instance.getPort());
map.put("weight", instance.getWeight().doubleValue());
NacosMetadata metadata = new NacosMetadata();
metadata.setAppName(appInfo.getAppName());
metadata.setVersion(instance.getVersion());
metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
map.put("ephemeral", true);
return map;
}
}
}
ship-server再定时从Nacos拉取app数据更新到本地Map缓存。
* @Author: Ship
* @Description: sync data to local cache
* @Date: Created in 2020/12/25
*/
@Configuration
public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent> {
private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
new ShipThreadFactory("service-sync", true).create());
@NacosInjected
private NamingService namingService;
@Autowired
private ServerConfigProperties properties;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() != null) {
return;
}
scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
, 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
websocketSyncCacheServer.start();
}
class DataSyncTask implements Runnable {
private NamingService namingService;
public DataSyncTask(NamingService namingService) {
this.namingService = namingService;
}
@Override
public void run() {
try {
// get all app names
ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
if (CollectionUtils.isEmpty(services.getData())) {
return;
}
List<String> appNames = services.getData();
// get all instances
for (String appName : appNames) {
List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
if (CollectionUtils.isEmpty(instanceList)) {
continue;
}
ServiceCache.add(appName, buildServiceInstances(instanceList));
List<String> pluginNames = getEnabledPlugins(instanceList);
PluginCache.add(appName, pluginNames);
}
ServiceCache.removeExpired(appNames);
PluginCache.removeExpired(appNames);
} catch (NacosException e) {
e.printStackTrace();
}
}
private List<String> getEnabledPlugins(List<Instance> instanceList) {
Instance instance = instanceList.get(0);
Map<String, String> metadata = instance.getMetadata();
// plugins: DynamicRoute,Auth
String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
}
private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
List<ServiceInstance> list = new LinkedList<>();
instanceList.forEach(instance -> {
Map<String, String> metadata = instance.getMetadata();
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setAppName(metadata.get("appName"));
serviceInstance.setIp(instance.getIp());
serviceInstance.setPort(instance.getPort());
serviceInstance.setVersion(metadata.get("version"));
serviceInstance.setWeight((int) instance.getWeight());
list.add(serviceInstance);
});
return list;
}
}
}
路由规则数据同步
同时,如果用户在管理后台更新了路由规则,ship-admin需要推送规则数据到ship-server,这里参考了soul网关的做法利用websocket在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。
服务端WebsocketSyncCacheServer:
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/28
*/
public class WebsocketSyncCacheServer extends WebSocketServer {
private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
private Gson gson = new GsonBuilder().create();
private MessageHandler messageHandler;
public WebsocketSyncCacheServer(Integer port) {
super(new InetSocketAddress(port));
this.messageHandler = new MessageHandler();
}
@Override
public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
LOGGER.info("server is open");
}
@Override
public void onClose(WebSocket webSocket, int i, String s, boolean b) {
LOGGER.info("websocket server close...");
}
@Override
public void onMessage(WebSocket webSocket, String message) {
LOGGER.info("websocket server receive message:\n[{}]", message);
this.messageHandler.handler(message);
}
@Override
public void onError(WebSocket webSocket, Exception e) {
}
@Override
public void onStart() {
LOGGER.info("websocket server start...");
}
class MessageHandler {
public void handler(String message) {
RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
return;
}
Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
.stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
| OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
RouteRuleCache.add(map);
} else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
RouteRuleCache.remove(map);
}
}
}
}
客户端WebsocketSyncCacheClient:
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/28
*/
@Component
public class WebsocketSyncCacheClient {
private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
private WebSocketClient client;
private RuleService ruleService;
private Gson gson = new GsonBuilder().create();
public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
RuleService ruleService) {
if (StringUtils.isEmpty(serverWebSocketUrl)) {
throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
}
this.ruleService = ruleService;
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
new ShipThreadFactory("websocket-connect", true).create());
try {
client = new WebSocketClient(new URI(serverWebSocketUrl)) {
@Override
public void onOpen(ServerHandshake serverHandshake) {
LOGGER.info("client is open");
List<AppRuleDTO> list = ruleService.getEnabledRule();
String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
send(msg);
}
@Override
public void onMessage(String s) {
}
@Override
public void onClose(int i, String s, boolean b) {
}
@Override
public void onError(Exception e) {
LOGGER.error("websocket client error", e);
}
};
client.connectBlocking();
//使用调度线程池进行断线重连,30秒进行一次
executor.scheduleAtFixedRate(() -> {
if (client != null && client.isClosed()) {
try {
client.reconnectBlocking();
} catch (InterruptedException e) {
LOGGER.error("reconnect server fail", e);
}
}
}, 10, 30, TimeUnit.SECONDS);
} catch (Exception e) {
LOGGER.error("websocket sync cache exception", e);
throw new ShipException(e.getMessage());
}
}
public <T> void send(T t) {
while (!client.getReadyState().equals(ReadyState.OPEN)) {
LOGGER.debug("connecting ...please wait");
}
client.send(gson.toJson(t));
}
}
1、本地启动nacos ,sh startup.sh -m standalone
2、启动ship-admin
3、本地启动两个ship-example实例。
实例1配置:
ship:
http:
app-name: order
version: gray_1.0
context-path: /order
port: 8081
admin-url: 127.0.0.1:9001
server:
port: 8081
nacos:
discovery:
server-addr: 127.0.0.1:8848
实例2配置:
ship:
http:
app-name: order
version: prod_1.0
context-path: /order
port: 8082
admin-url: 127.0.0.1:9001
server:
port: 8082
nacos:
discovery:
server-addr: 127.0.0.1:8848
4、在数据库添加路由规则配置,该规则表示当http header 中的name=ship时请求路由到gray_1.0版本的节点。
5、启动ship-server,看到以下日志时则可以进行测试了。
2021-01-02 19:57:09.159 INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer : websocket server receive message:
[{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
6、用Postman请求http://localhost:9000/order/user/add
,POST方式,header设置name=ship,可以看到只有实例1有日志显示。
==========add user,version:gray_1.0
压测环境:
压测结果
千里之行始于足下,开始以为写一个网关会很难,但当你实际开始行动时就会发现其实没那么难,所以迈出第一步很重要。过程中也遇到了很多问题,还在github上给soul和nacos这两个开源项目提了两个issue,后来发现是自己的问题,尴尬。
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/uYnOBRKHQxleSbmVd8S2IA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。