CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛
CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛
CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛

后端api设计(越早知道越好)开发api接口框架哪个好用,从零开始,设计一个高性能 API 网关,

1.api接口用什么语言开发好

一、前言最近在 github 上看了 soul 网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关经过两周时间的开发,我的网关 ship-gate 核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台 。

2.api接口怎么开发

二、设计2.1 技术选型网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决目前一般有以下两种方案:Tomcat/Jetty+NIO+Servlet3Servlet3 已经支持异步,这种方案使用比较多,京东,有赞和 Zuul,都用的是这种方案。

3.api接口设计需要考虑的方面

Netty+NIONetty 为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下 Netty 是每秒 30w+的吞吐量,Tomcat 是 13w+,可以看出是有一定的差距的,但是 Netty 需要自己处理 HTTP 协议,这一块比较麻烦。

4.api是前端还是后端

后面发现 Soul 网关是基于 Spring WebFlux(底层 Netty)的,不用太关心 HTTP 协议的处理,于是决定也用 Spring WebFlux网关的第二个特点是具备可扩展性,比如 Netflix Zuul 有 preFilters,postFilters 等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。

5.接口开发方式有几种

在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据 URL 找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心现在流行的注册中心有 Apache 的 Zookeeper 和阿里的 Nacos 两种(consul 有点小众),因为之前写 RPC 框架时已经用过了 Zookeeper,所以这次就选择了 Nacos。

6.api接口的优势

2.2 需求清单首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:自定义路由规则可基于 version 的路由规则设置,路由对象包括 DEFAUL,HEADER 和 QUERY 三种,匹配方式包括=、regex、like 三种。

7.{ 十年磨一剑-为API开发设计的高性能框架 }

跨语言HTTP 协议天生跨语言高性能Netty 本身就是一款高性能的通信框架,同时 server 将一些路由规则等数据缓存到 JVM 内存避免请求 admin 服务高可用支持集群模式防止单节点故障,无状态。

8.api开发经验

灰度发布灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式在其上可以进行 A/B testing,即让一部分用户继续用产品特性 A,一部分用户开始用产品特性 B,如果用户对 B 没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到 B 上面来。

9.接口开发有哪些技术

通过特性一可以实现接口鉴权基于责任链模式,用户开发自己的鉴权插件即可负载均衡支持多种负载均衡算法,如随机,轮询,加权轮询等利用 SPI 机制可以根据配置进行动态加载2.3 架构设计在参考了一些优秀的网关 Zuul,Spring Cloud Gateway,Soul 后,将项目划分为以下几个模块。

10.api接口风格

它们之间的关系如图:

网关设计注意:这张图与实际实现有点出入,Nacos push 到本地缓存的那个环节没有实现,目前只有 ship-sever 定时轮询 pull 的过程ship-admin 从 Nacos 获取注册服务信息的过程,也改成了 ServiceA 启动时主动发生 HTTP 请求通知 ship-admin。

2.4 表结构设计

三、编码3.1 ship-client-spring-boot-starter首先创建一个 spring-boot-starter 命名为 ship-client-spring-boot-starter

,其核心类 AutoRegisterListener 就是在项目启动时做了两件事:服务信息注册到 Nacos 注册中心知 ship-admin 服务上线了并注册下线 hook代码如下:publicclass。

AutoRegisterListenerimplementsApplicationListener { privatefinalstatic

Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class); privatevolatile AtomicBoolean registered =

new AtomicBoolean(false); privatefinal ClientConfigProperties properties; @NacosInjected

private NamingService namingService; @Autowiredprivate RequestMappingHandlerMapping handlerMapping;

privatefinal ExecutorService pool; /** * url list to ignore */privatestatic List ignoreUrlList =

new LinkedList<>(); static { ignoreUrlList.add(“/error”); } publicAutoRegisterListener

(ClientConfigProperties properties){ if (!check(properties)) { LOGGER.error(“client config port,contextPath,appName adminUrl and version cant be empty!”

); thrownew ShipException(“client config port,contextPath,appName adminUrl and version cant be empty!”

); } this.properties = properties; pool = new ThreadPoolExecutor(1, 4,

0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); } /** * check the ClientConfigProperties * *

@param properties * @return */privatebooleancheck(ClientConfigProperties properties){

if (properties.getPort() == null || properties.getContextPath() == null || properties.getVersion() ==

null || properties.getAppName() == null || properties.getAdminUrl() == null) {

returnfalse; } returntrue; } @OverridepublicvoidonApplicationEvent

(ContextRefreshedEvent event){ if (!registered.compareAndSet(false, true)) {

return; } doRegister(); registerShutDownHook(); } /** * send unregister request to admin when jvm shutdown */

privatevoidregisterShutDownHook(){ final String url = “http://” + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;

final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO(); unregisterAppDTO.setAppName(properties.getAppName()); unregisterAppDTO.setVersion(properties.getVersion()); unregisterAppDTO.setIp(IpUtil.getLocalIpAddress()); unregisterAppDTO.setPort(properties.getPort()); Runtime.getRuntime().addShutdownHook(

new Thread(() -> { OkhttpTool.doPost(url, unregisterAppDTO); LOGGER.info(

“[{}:{}] unregister from ship-admin success!”, unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion()); })); }

/** * register all interface info to register center */privatevoiddoRegister(){ Instance instance =

new Instance(); instance.setIp(IpUtil.getLocalIpAddress()); instance.setPort(properties.getPort()); instance.setEphemeral(

true); Map metadataMap = new HashMap<>(); metadataMap.put(“version”

, properties.getVersion()); metadataMap.put(“appName”, properties.getAppName()); instance.setMetadata(metadataMap);

try { namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance); }

catch (NacosException e) { LOGGER.error(“register to nacos fail”, e); throw

new ShipException(e.getErrCode(), e.getErrMsg()); } LOGGER.info(“register interface info to nacos success!”

); // send register request to ship-admin String url = “http://” + properties.getAdminUrl() + AdminConstants.REGISTER_PATH; RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance); OkhttpTool.doPost(url, registerAppDTO); LOGGER.info(

“register to ship-admin success!”); } private RegisterAppDTO buildRegisterAppDTO(Instance instance)

{ RegisterAppDTO registerAppDTO = new RegisterAppDTO(); registerAppDTO.setAppName(properties.getAppName()); registerAppDTO.setContextPath(properties.getContextPath()); registerAppDTO.setIp(instance.getIp()); registerAppDTO.setPort(instance.getPort()); registerAppDTO.setVersion(properties.getVersion());

return registerAppDTO; } } 3.2 ship-servership-sever 项目主要包括了两个部分内容, 1.请求动态路由的主流程 2.本地缓存数据和 ship-admin 及 nacos 同步,这部分在后面 3.3 再讲。

ship-server 实现动态路由的原理是利用 WebFilter 拦截请求,然后将请求教给 plugin chain 去链式处理PluginFilter 根据 URL 解析出 appName,然后将启用的 plugin 组装成 plugin chain。

publicclassPluginFilterimplementsWebFilter{ private ServerConfigProperties properties;

publicPluginFilter(ServerConfigProperties properties){ this.properties = properties; }

@Overridepublic Mono filter(ServerWebExchange exchange, WebFilterChain chain){ String appName = parseAppName(exchange);

if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) { thrownew ShipException(ShipExceptionEnum.SERVICE_NOT_FIND); } PluginChain pluginChain =

new PluginChain(properties, appName); pluginChain.addPlugin(new DynamicRoutePlugin(properties)); pluginChain.addPlugin(

new AuthPlugin(properties)); return pluginChain.execute(exchange, pluginChain); }

private String parseAppName(ServerWebExchange exchange){ RequestPath path = exchange.getRequest().getPath(); String appName = path.value().split(

“/”)[1]; return appName; } } PluginChain 继承了 AbstractShipPlugin 并持有所有要执行的插件/** * 。

@Author: Ship * @Description: * @Date: Created in 2020/12/25 */publicclassPluginChainextends

AbstractShipPlugin{ /** * the pos point to current plugin */privateint pos;

/** * the plugins of chain */private List plugins; privatefinal String appName;

publicPluginChain(ServerConfigProperties properties, String appName){ super(properties);

this.appName = appName; } /** * add enabled plugin to chain * * @param

shipPlugin */publicvoidaddPlugin(ShipPlugin shipPlugin){ if (plugins == null) { plugins =

new ArrayList<>(); } if (!PluginCache.isEnabled(appName, shipPlugin.name())) {

return; } plugins.add(shipPlugin); // order by the plugins order plugins.sort(Comparator.comparing(ShipPlugin::order)); }

@Overridepublic Integer order(){ returnnull; } @Overridepublic String name()

{ returnnull; } @Overridepublic Mono execute(ServerWebExchange exchange, PluginChain pluginChain)

{ if (pos == plugins.size()) { return exchange.getResponse().setComplete(); }

return pluginChain.plugins.get(pos++).execute(exchange, pluginChain); } public String

getAppName(){ return appName; } } AbstractShipPlugin 实现了 ShipPlugin 接口,并持有 ServerConfigProperties 配置对象。

publicabstractclassAbstractShipPluginimplementsShipPlugin{ protected ServerConfigProperties properties;

publicAbstractShipPlugin(ServerConfigProperties properties){ this.properties = properties; } }

ShipPlugin 接口定义了所有插件必须实现的三个方法 order(),name()和 execute()publicinterfaceShipPlugin { /** * lower values have higher priority * * @return */。

Integer order(); /** * return current plugin name * * @return */

String name(); Mono execute(ServerWebExchange exchange,PluginChain pluginChain); }

DynamicRoutePlugin 继承了抽象类 AbstractShipPlugin,包含了动态路由的主要业务逻辑publicclassDynamicRoutePluginextendsAbstractShipPlugin。

{ privatefinalstatic Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);

privatestatic WebClient webClient; privatestaticfinal Gson gson = new GsonBuilder().create();

static { HttpClient httpClient = HttpClient.create() .tcpConfiguration(client -> client.doOnConnected(conn -> conn.addHandlerLast(

new ReadTimeoutHandler(3)) .addHandlerLast(new WriteTimeoutHandler(

3))) .option(ChannelOption.TCP_NODELAY, true) ); webClient = WebClient.builder().clientConnector(

new ReactorClientHttpConnector(httpClient)) .build(); } publicDynamicRoutePlugin

(ServerConfigProperties properties){ super(properties); } @Overridepublic Integer

order(){ return ShipPluginEnum.DYNAMIC_ROUTE.getOrder(); } @Overridepublic String

name(){ return ShipPluginEnum.DYNAMIC_ROUTE.getName(); } @Overridepublic Mono

execute(ServerWebExchange exchange, PluginChain pluginChain){ String appName = pluginChain.getAppName(); ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());

// LOGGER.info(“selected instance is [{}]”, gson.toJson(serviceInstance)); // request service

String url = buildUrl(exchange, serviceInstance); return forward(exchange, url); }

/** * forward request to backend service * * @param exchange * @param url *

@return */private Mono forward(ServerWebExchange exchange, String url){ ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); HttpMethod method = request.getMethod(); WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> { headers.addAll(request.getHeaders()); }); WebClient.RequestHeadersSpec reqHeadersSpec;

if (requireHttpBody(method)) { reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody())); }

else { reqHeadersSpec = requestBodySpec; } // nio->callback->nio return

reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis())) .onErrorResume(ex -> {

return Mono.defer(() -> { String errorResultJson = “”;

if (ex instanceof TimeoutException) { errorResultJson = “{\”code\”:5001,\”message\”:\”network timeout\”}”

; } else { errorResultJson = “{\”code\”:5000,\”message\”:\”system error\”}”

; } return ShipResponseUtil.doResponse(exchange, errorResultJson); }).then(Mono.empty()); }).flatMap(backendResponse -> { response.setStatusCode(backendResponse.statusCode()); response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());

return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class)); }); }

/** * weather the http method need http body * * @param method * @return */

privatebooleanrequireHttpBody(HttpMethod method){ if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) || method.equals(HttpMethod.PATCH)) {

returntrue; } returnfalse; } private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance)

{ ServerHttpRequest request = exchange.getRequest(); String query = request.getURI().getQuery(); String path = request.getPath().value().replaceFirst(

“/” + serviceInstance.getAppName(), “”); String url = “http://” + serviceInstance.getIp() +

“:” + serviceInstance.getPort() + path; if (!StringUtils.isEmpty(query)) { url = url +

“?” + query; } return url; } /** * choose an ServiceInstance according to route rule config and load balancing algorithm * *

@param appName * @param request * @return */private ServiceInstance chooseInstance

(String appName, ServerHttpRequest request){ List serviceInstances = ServiceCache.getAllInstances(appName);

if (CollectionUtils.isEmpty(serviceInstances)) { LOGGER.error(“service instance of {} not find”

, appName); thrownew ShipException(ShipExceptionEnum.SERVICE_NOT_FIND); } String version = matchAppVersion(appName, request);

if (StringUtils.isEmpty(version)) { thrownew ShipException(“match app version error”); }

// filter serviceInstances by version List instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());

//Select an instance based on the load balancing algorithm LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version); ServiceInstance serviceInstance = loadBalance.chooseOne(instances);

return serviceInstance; } private String matchAppVersion(String appName, ServerHttpRequest request)

{ List rules = RouteRuleCache.getRules(appName); rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());

for (AppRuleDTO rule : rules) { if (match(rule, request)) { return rule.getVersion(); } }

returnnull; } privatebooleanmatch(AppRuleDTO rule, ServerHttpRequest request){ String matchObject = rule.getMatchObject(); String matchKey = rule.getMatchKey(); String matchRule = rule.getMatchRule(); Byte matchMethod = rule.getMatchMethod();

if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) { returntrue; } else

if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) { String param = request.getQueryParams().getFirst(matchKey);

if (!StringUtils.isEmpty(param)) { return StringTools.match(param, matchMethod, matchRule); } }

elseif (MatchObjectEnum.HEADER.getCode().equals(matchObject)) { HttpHeaders headers = request.getHeaders(); String headerValue = headers.getFirst(matchKey);

if (!StringUtils.isEmpty(headerValue)) { return StringTools.match(headerValue, matchMethod, matchRule); } }

returnfalse; } } 3.3 数据同步app 数据同步后台服务(如订单服务)启动时,只将服务名,版本,ip 地址和端口号注册到了 Nacos,并没有实例的权重和启用的插件信息怎么办?

一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要 ship-admin 定时更新实例的权重和插件信息到注册中心对应代码 ship-admin 的 NacosSyncListener。

@Configurationpublicclass NacosSyncListener implements ApplicationListener {

privatestatic final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class); private

static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1, new

ShipThreadFactory(“nacos-sync”, true).create()); @NacosInjectedprivate NamingService namingService;

@Value(“${nacos.discovery.server-addr}”) privateString baseUrl; @Resourceprivate AppService appService;

@Overridepublicvoid onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() !=

null) { return; } String url = “http://” + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH; scheduledPool.scheduleWithFixedDelay(

new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS); } class NacosSyncTask

implements Runnable { private NamingService namingService; privateString url;

private AppService appService; private Gson gson = new GsonBuilder().create();

public NacosSyncTask(NamingService namingService, String url, AppService appService) {

this.namingService = namingService; this.url = url; this.appService = appService; }

/** * Regular update weight,enabled plugins to nacos instance */@Overridepublic

void run() { try { // get all app names ListView

> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);

if (CollectionUtils.isEmpty(services.getData())) { return; } List<

String> appNames = services.getData(); List appInfos = appService.getAppInfos(appNames);

for (AppInfoDTO appInfo : appInfos) { if (CollectionUtils.isEmpty(appInfo.getInstances())) {

continue; } for (ServiceInstance instance : appInfo.getInstances()) { Map<

String, Object> queryMap = buildQueryMap(appInfo, instance); String resp = OkhttpTool.doPut(url, queryMap,

“”); LOGGER.debug(“response :{}”, resp); } } }

catch (Exception e) { LOGGER.error(“nacos sync task error”, e); } }

private Map buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) { Map<

String, Object> map = new HashMap<>(); map.put(“serviceName”, appInfo.getAppName()); map.put(

“groupName”, NacosConstants.APP_GROUP_NAME); map.put(“ip”, instance.getIp()); map.put(

“port”, instance.getPort()); map.put(“weight”, instance.getWeight().doubleValue()); NacosMetadata metadata =

new NacosMetadata(); metadata.setAppName(appInfo.getAppName()); metadata.setVersion(instance.getVersion()); metadata.setPlugins(

String.join(“,”, appInfo.getEnabledPlugins())); map.put(“metadata”, StringTools.urlEncode(gson.toJson(metadata))); map.put(

“ephemeral”, true); return map; } } } ship-server 再定时从 Nacos 拉取 app 数据更新到本地 Map 缓存。

@ConfigurationpublicclassDataSyncTaskListenerimplementsApplicationListener {

privatestatic ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,

new ShipThreadFactory(“service-sync”, true).create()); @NacosInjectedprivate NamingService namingService;

@Autowiredprivate ServerConfigProperties properties; @OverridepublicvoidonApplicationEvent(ContextRefreshedEvent event)

{ if (event.getApplicationContext().getParent() != null) { return; } scheduledPool.scheduleWithFixedDelay(

new DataSyncTask(namingService) , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS); WebsocketSyncCacheServer websocketSyncCacheServer =

new WebsocketSyncCacheServer(properties.getWebSocketPort()); websocketSyncCacheServer.start(); }

classDataSyncTaskimplementsRunnable{ private NamingService namingService; public

DataSyncTask(NamingService namingService){ this.namingService = namingService; }

@Overridepublicvoidrun(){ try { // get all app names ListView services = namingService.getServicesOfServer(

1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME); if (CollectionUtils.isEmpty(services.getData())) {

return; } List appNames = services.getData();

// get all instances for (String appName : appNames) { List instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);

if (CollectionUtils.isEmpty(instanceList)) { continue; } ServiceCache.add(appName, buildServiceInstances(instanceList)); List pluginNames = getEnabledPlugins(instanceList); PluginCache.add(appName, pluginNames); } ServiceCache.removeExpired(appNames); PluginCache.removeExpired(appNames); }

catch (NacosException e) { e.printStackTrace(); } }

private List getEnabledPlugins(List instanceList){ Instance instance = instanceList.get(

0); Map metadata = instance.getMetadata(); // plugins: DynamicRoute,Auth

String plugins = metadata.getOrDefault(“plugins”, ShipPluginEnum.DYNAMIC_ROUTE.getName());

return Arrays.stream(plugins.split(“,”)).collect(Collectors.toList()); } private

List buildServiceInstances(List instanceList){ List list =

new LinkedList<>(); instanceList.forEach(instance -> { Map metadata = instance.getMetadata(); ServiceInstance serviceInstance =

new ServiceInstance(); serviceInstance.setAppName(metadata.get(“appName”)); serviceInstance.setIp(instance.getIp()); serviceInstance.setPort(instance.getPort()); serviceInstance.setVersion(metadata.get(

“version”)); serviceInstance.setWeight((int) instance.getWeight()); list.add(serviceInstance); });

return list; } } } 路由规则数据同步同时,如果用户在管理后台更新了路由规则,ship-admin 需要推送规则数据到 ship-server,这里参考了 soul 网关的做法利用 websocket 在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。

服务端 WebsocketSyncCacheServer:/** * @Author: Ship * @Description: * @Date: Created in 2020/12/28 */

publicclassWebsocketSyncCacheServerextendsWebSocketServer{ privatefinalstatic Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer

.class); private Gson gson = new GsonBuilder().create(); private MessageHandler messageHandler;

publicWebsocketSyncCacheServer(Integer port){ super(new InetSocketAddress(port));

this.messageHandler = new MessageHandler(); } @OverridepublicvoidonOpen(WebSocket webSocket, ClientHandshake clientHandshake)

{ LOGGER.info(“server is open”); } @OverridepublicvoidonClose(WebSocket webSocket,

int i, String s, boolean b){ LOGGER.info(“websocket server close…”); } @Override

publicvoidonMessage(WebSocket webSocket, String message){ LOGGER.info(“websocket server receive message:\n[{}]”

, message); this.messageHandler.handler(message); } @OverridepublicvoidonError

(WebSocket webSocket, Exception e){ } @OverridepublicvoidonStart(){ LOGGER.info(

“websocket server start…”); } classMessageHandler{ publicvoidhandler

(String message){ RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO

.class); if (CollectionUtils.isEmpty(operationDTO.getRuleList())) { return

; } Map map = operationDTO.getRuleList() .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));

if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType()) || OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) { RouteRuleCache.add(map); }

elseif (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) { RouteRuleCache.remove(map); } } } }

客户端 WebsocketSyncCacheClient:@ComponentpublicclassWebsocketSyncCacheClient{ privatefinalstatic

Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class); private WebSocketClient client;

private RuleService ruleService; private Gson gson = new GsonBuilder().create(); public

WebsocketSyncCacheClient(@Value(“${ship.server-web-socket-url}”) String serverWebSocketUrl, RuleService ruleService)

{ if (StringUtils.isEmpty(serverWebSocketUrl)) { thrownew ShipException(ShipExceptionEnum.CONFIG_ERROR); }

this.ruleService = ruleService; ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(

1, new ShipThreadFactory(“websocket-connect”, true).create()); try { client =

new WebSocketClient(new URI(serverWebSocketUrl)) { @OverridepublicvoidonOpen(ServerHandshake serverHandshake)

{ LOGGER.info(“client is open”); List list = ruleService.getEnabledRule(); String msg = gson.toJson(

new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list)); send(msg); }

@OverridepublicvoidonMessage(String s){ } @OverridepublicvoidonClose

(int i, String s, boolean b){ } @OverridepublicvoidonError(Exception e)

{ LOGGER.error(“websocket client error”, e); } }; client.connectBlocking();

//使用调度线程池进行断线重连,30秒进行一次 executor.scheduleAtFixedRate(() -> { if (client !=

null && client.isClosed()) { try { client.reconnectBlocking(); }

catch (InterruptedException e) { LOGGER.error(“reconnect server fail”, e); } } },

10, 30, TimeUnit.SECONDS); } catch (Exception e) { LOGGER.error(“websocket sync cache exception”

, e); thrownew ShipException(e.getMessage()); } } public void

send(T t){ while (!client.getReadyState().equals(ReadyState.OPEN)) { LOGGER.debug(

“connecting …please wait”); } client.send(gson.toJson(t)); } } 四、测试4.1 动态路由测试

本地启动 nacos ,sh startup.sh -m standalone启动 ship-admin本地启动两个 ship-example 实例实例 1 配置:ship:http:app-name:。

orderversion:gray_1.0context-path:/orderport:8081admin-url:127.0.0.1:9001server:port:8081nacos:discovery:

server-addr:127.0.0.1:8848实例 2 配置:ship:http:app-name:orderversion:prod_1.0context-path:/orderport:8082

admin-url:127.0.0.1:9001server:port:8082nacos:discovery:server-addr:127.0.0.1:8848在数据库添加路由规则配置,该规则表示当 http header 中的 name=ship 时请求路由到 gray_1.0 版本的节点。

启动 ship-server,看到以下日志时则可以进行测试了2021-01-02 19:57:09.159INFO 30413 —[SocketWorker-29]cn.sp.sync.WebsocketSyncCacheServer。

: websocketserverreceivemessage: [{“operationType”:”INSERT”,”ruleList”:[{“id”:1,”appId”:5,”appName”

:”order”,”version”:”gray_1.0″,”matchObject”:”HEADER”,”matchKey”:”name”,”matchMethod”:1,”matchRule”:”ship”

,”priority”:50}]}] 用 Postman 请求http://localhost:9000/order/user/add,POST方式,header设置name=ship,可以看到只有实例1有日志显示。

==========add user,version:gray_1.04.2 性能压测压测环境:MacBook Pro 13 英寸处理器 2.3 GHz 四核 Intel Core i7内存 16 GB 3733 MHz LPDDR4X

后端节点个数一个压测工具:wrk压测结果:20 个线程,500 个连接数,吞吐量大概每秒 9400 个请求。

压测结果五、总结千里之行始于足下,开始以为写一个网关会很难,但当你实际开始行动时就会发现其实没那么难,所以迈出第一步很重要过程中也遇到了很多问题,还在 github 上给 soul 和 nacos 这两个开源项目提了两个 issue,后来发现是自己的问题,尴尬 。

本文代码已全部上传到 github:https://github.com/2YSP/ship-gate参考资料:https://nacos.io/zh-cn/docs/quick-start.htmlhttps://dromara.org/website/zh-cn/docs/soul/soul.html

https://docs.spring.io/spring-framework/docs/5.1.7.RELEASE/spring-framework-reference/web-reactive.html#webflux

https://github.com/TooTallNate/Java-WebSocket

© 版权声明
THE END
喜欢就支持一下吧
点赞0赞赏 分享
相关推荐
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容