Soul网关中利用HTTP长轮询实现数据同步,这也太好用了叭!

沙海 2021年7月14日01:22:38Java评论32字数 16699阅读55分39秒阅读模式
摘要

Soul网关中利用HTTP长轮询实现数据同步,这也太好用了叭! Java学习者社区

Soul网关中利用HTTP长轮询实现数据同步,这也太好用了叭!

Java学习者社区 文章源自JAVA秀-https://www.javaxiu.com/37561.html

Soul网关中利用HTTP长轮询实现数据同步,这也太好用了叭!文章源自JAVA秀-https://www.javaxiu.com/37561.html

前言

Soul是一款异步的、高性能的、跨语言的、响应式的API网关,本文主要来解析Soul中基于Http长轮询同步数据的设计,下图展示了Soul数据同步的流程:文章源自JAVA秀-https://www.javaxiu.com/37561.html

Soul网关中利用HTTP长轮询实现数据同步,这也太好用了叭!文章源自JAVA秀-https://www.javaxiu.com/37561.html

Soul主要由soul-admin和soul-web两个核心模块组成,soul-admin是路由和其他信息配置的管理后台;soul-web是网关主体,用于实现路由转发等功能。文章源自JAVA秀-https://www.javaxiu.com/37561.html

soul-web实现了动态配置功能,会同步soul-admin中的路由和其他配置信息,为了提供更高的响应速度,soul-web所有的缓存配置都存在JVM的ConcurrentMap中,每次请求都走的本地缓存,速度非常快文章源自JAVA秀-https://www.javaxiu.com/37561.html

soul-admin在用户发生配置变更之后,会通过EventPublisher发出配置变更通知,由EventDispatcher处理该变更通知,然后根据配置的同步策略(Http、Websocket、Zookeeper),将配置发送给对应的事件处理器文章源自JAVA秀-https://www.javaxiu.com/37561.html

Http同步策略实现如下:文章源自JAVA秀-https://www.javaxiu.com/37561.html

soul-web主动发起长轮询请求,默认有90s超时时间,如果soul-admin没有数据变更,则会阻塞Http请求,如果有数据发生变更则响应变更的数据信息,如果超过60s仍然没有数据变更则响应空数据,网关层接到响应后,继续发起Http请求,反复同样的请求文章源自JAVA秀-https://www.javaxiu.com/37561.html

启用Http同步策略

soul-admin中开启Http同步策略文章源自JAVA秀-https://www.javaxiu.com/37561.html

soul:  sync:    http:      enabled: true

soul-bootstrap中配置管理后台的请求地址文章源自JAVA秀-https://www.javaxiu.com/37561.html

soul:  sync:    http:      url: http://localhost:9095     

源码解析

这块主要是在soul-admin子模块中实现的文章源自JAVA秀-https://www.javaxiu.com/37561.html

DataChangedListener体系

配置类DataSyncConfiguration中初始化了关于Http长轮询的Bean文章源自JAVA秀-https://www.javaxiu.com/37561.html

@Configurationpublic class DataSyncConfiguration {    @Configuration    @ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")    @EnableConfigurationProperties(HttpSyncProperties.class)    static class HttpLongPollingListener {        @Bean        @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)        public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {            return new HttpLongPollingDataChangedListener(httpSyncProperties);        }    }

HttpLongPollingDataChangedListener继承自AbstractDataChangedListener,和其他同步策略一样都实现自接口DataChangedListener文章源自JAVA秀-https://www.javaxiu.com/37561.html

Soul网关中利用HTTP长轮询实现数据同步,这也太好用了叭!文章源自JAVA秀-https://www.javaxiu.com/37561.html

DataChangedListener这个接口提供了众多不同数据类型变动的方法,供DataChangedEventDispatcher调用,DataChangedEventDispatcher用于处理数据同步的事件分类及分发文章源自JAVA秀-https://www.javaxiu.com/37561.html

public interface DataChangedListener {    default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {    }    default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {    }    default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {    }    default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {    }    default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {    }}
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {    private ApplicationContext applicationContext;    //持有DataChangedListener集合    private List<DataChangedListener> listeners;    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {        this.applicationContext = applicationContext;    }    //事件变动时,通知DataChangedListener的不同事件类型的方法    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        for (DataChangedListener listener : listeners) {            switch (event.getGroupKey()) {                case APP_AUTH:                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }    @Override    public void afterPropertiesSet() {        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));    }}

AbstractDataChangedListener就是定义了一个模板,让子类可以按照指定步骤进行工作,具体每个步骤的工作细节可以由子类自己实现(模版方法模式),以onPluginChanged()插件数据变动方法为例文章源自JAVA秀-https://www.javaxiu.com/37561.html

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {      @Override    public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        if (CollectionUtils.isEmpty(changed)) {            return;        }        this.updatePluginCache();        this.afterPluginChanged(changed, eventType);    }     //修改缓存(可重写)    protected void updatePluginCache() {        this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());    }     //自定义结束数据变动后要干什么(可重写)    protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {    }  

HttpLongPollingDataChangedListener长轮询实现

soul-admin通过ConfigController暴露Http路径供网关调用并监听数据变化文章源自JAVA秀-https://www.javaxiu.com/37561.html

@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")@Slf4jpublic class ConfigController {    @Resource    private HttpLongPollingDataChangedListener longPollingListener;    @PostMapping(value = "/listener")    public void listener(final HttpServletRequest request, final HttpServletResponse response) {        longPollingListener.doLongPolling(request, response);    }}

这里调用了HttpLongPollingDataChangedListenerdoLongPolling()方法,doLongPolling()是实现Http长轮询的关键:文章源自JAVA秀-https://www.javaxiu.com/37561.html

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {   public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {        //因为soul-web可能未收到某个配置变更的通知,因此MD5值可能不一致,则立即响应        List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);        String clientIp = getRemoteIp(request);        if (CollectionUtils.isNotEmpty(changedGroup)) {            this.generateResponse(response, changedGroup);            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);            return;        }        //Servlet3.0异步响应http请求        final AsyncContext asyncContext = request.startAsync();        asyncContext.setTimeout(0L);        scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));    }      class LongPollingClient implements Runnable {              @Override        public void run() {            //加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求            this.asyncTimeoutFuture = scheduler.schedule(() -> {                clients.remove(LongPollingClient.this);                List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());                sendResponse(changedGroups);            }, timeoutTime, TimeUnit.MILLISECONDS);            clients.add(this);        }              void sendResponse(final List<ConfigGroupEnum> changedGroups) {            //取消调度            if (null != asyncTimeoutFuture) {                asyncTimeoutFuture.cancel(false);            }            generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);            //同步完成结束阻塞            asyncContext.complete();        }      

Http请求到达sou-admin之后,并非立马响应数据,而是利用Servlet3.0的异步机制,异步响应数据。首先,将长轮询请求任务LongPollingClient扔到BlockingQueue中,并且开启调度任务,60s后执行,这样做的目的是60s后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更。因为即便是没有配置变更,也得让网关知道,总不能让其干等吧,而且网关请求配置服务时,也有90s的超时时间文章源自JAVA秀-https://www.javaxiu.com/37561.html

长轮询时配置数据变更立即响应数据

讲完了Http请求到达sou-admin这段时间没有变更配置数据60s后返回响应,如果这段时间内,存在配置数据变更呢?文章源自JAVA秀-https://www.javaxiu.com/37561.html

首先Http请求到达sou-admin时最终拿到的配置信息数据都是从AbstractDataChangedListenerCache中获取的文章源自JAVA秀-https://www.javaxiu.com/37561.html

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {    protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();

AbstractDataChangedListener中本地缓存数据是何时更新的,只有本地缓存更新了才会立即响应数据文章源自JAVA秀-https://www.javaxiu.com/37561.html

AbstractDataChangedListener中本地缓存数据更新时机:文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • 触发DataChangedEvent事件,DataChangedEventDispatcher监听到来处理数据同步的事件分类及分发文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • 定时任务同步文章源自JAVA秀-https://www.javaxiu.com/37561.html

1)触发Spring事件,DataChangedEventDispatcher监听到来处理数据同步的事件分类及分发文章源自JAVA秀-https://www.javaxiu.com/37561.html

就是DataChangedListener体系中讲解的内容,下面以创建和修改Plugin为例:文章源自JAVA秀-https://www.javaxiu.com/37561.html

@Service("pluginService")public class PluginServiceImpl implements PluginService {      @Override    @Transactional(rollbackFor = Exception.class)    public String createOrUpdate(final PluginDTO pluginDTO) {        final String msg = checkData(pluginDTO);        if (StringUtils.isNoneBlank(msg)) {            return msg;        }        PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);        DataEventTypeEnum eventType = DataEventTypeEnum.CREATE;        if (StringUtils.isBlank(pluginDTO.getId())) {            insertPluginDataToResource(pluginDTO);            pluginMapper.insertSelective(pluginDO);        } else {            eventType = DataEventTypeEnum.UPDATE;            pluginMapper.updateSelective(pluginDO);        }        //触发事件        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, eventType,                Collections.singletonList(PluginTransfer.INSTANCE.mapToData(pluginDO))));        return StringUtils.EMPTY;    }

创建和修改Plugin之后,触发了DataChangedEventDataChangedEventDispatcher根据groupKey进行分发,调用了AbstractDataChangedListener中实现的onXxxChanged()方法:文章源自JAVA秀-https://www.javaxiu.com/37561.html

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {      //事件变动时,通知DataChangedListener的不同事件类型的方法    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        for (DataChangedListener listener : listeners) {            switch (event.getGroupKey()) {                case APP_AUTH:                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }

AbstractDataChangedListener中采用了模版方法模式,onXxxChanged()方法中修改了缓存,提供了afterXxxChanged()方法让子类自定义结束数据变动后要干什么文章源自JAVA秀-https://www.javaxiu.com/37561.html

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {      @Override    public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        if (CollectionUtils.isEmpty(changed)) {            return;        }        this.updatePluginCache();        this.afterPluginChanged(changed, eventType);    }     //修改缓存(可重写)    protected void updatePluginCache() {        this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());    }     //自定义结束数据变动后要干什么(可重写)    protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {    }  

HttpLongPollingDataChangedListener中重写了AbstractDataChangedListener中的afterXxxChanged()方法,afterXxxChanged()方法是通过事件变更触发长轮询立即响应的关键文章源自JAVA秀-https://www.javaxiu.com/37561.html

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {    @Override    protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {        scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));    }      class DataChangeTask implements Runnable {        private final ConfigGroupEnum groupKey;        private final long changeTime = System.currentTimeMillis();        DataChangeTask(final ConfigGroupEnum groupKey) {            this.groupKey = groupKey;        }        @Override        public void run() {           //soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {                LongPollingClient client = iter.next();                iter.remove();                client.sendResponse(Collections.singletonList(groupKey));                log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);            }        }    }  

如果这段时间内,管理员变更了配置数据,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个Group的数据发生了变更(将插件、规则、流量配置、用户配置数据分成不同的组)。文章源自JAVA秀-https://www.javaxiu.com/37561.html

网关收到响应信息之后,只知道是哪个Group发生了配置变更,还需要再次请求该Group的配置数据,这时候会调用ConfigController中暴露的fetch接口拿到最新的配置数据文章源自JAVA秀-https://www.javaxiu.com/37561.html

@ConditionalOnBean(HttpLongPollingDataChangedListener.class)@RestController@RequestMapping("/configs")@Slf4jpublic class ConfigController {    @Resource    private HttpLongPollingDataChangedListener longPollingListener;    @GetMapping("/fetch")    public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {        Map<String, ConfigData<?>> result = Maps.newHashMap();        for (String groupKey : groupKeys) {            ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));            result.put(groupKey, data);        }        return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);    }

关于为什么不是直接将变更的数据写出,作者给出的回答是:因为Http长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个Group信息发生了变更。文章源自JAVA秀-https://www.javaxiu.com/37561.html

2)定时任务同步文章源自JAVA秀-https://www.javaxiu.com/37561.html

定时任务同步是因为触发事件只是单个节点完成缓存更新,假设soul-admin存在A、B两个节点,更新配置数据的请求到达A节点完成了A节点内本地缓存数据的更新,而B节点并未更新本地缓存,所以B节点的数据是通过定时任务来同步的文章源自JAVA秀-https://www.javaxiu.com/37561.html

定时任务同步也是在HttpLongPollingDataChangedListener中实现的文章源自JAVA秀-https://www.javaxiu.com/37561.html

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {     @Override    protected void afterInitialize() {        long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();        //定期检查数据的更改并更新缓存        scheduler.scheduleWithFixedDelay(() -> {            log.info("http sync strategy refresh config start.");            try {                this.refreshLocalCache();                log.info("http sync strategy refresh config success.");            } catch (Exception e) {                log.error("http sync strategy refresh config error!", e);            }        }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);        log.info("http sync strategy refresh interval: {}ms", syncInterval);    }      private void refreshLocalCache() {        this.updateAppAuthCache();        this.updatePluginCache();        this.updateRuleCache();        this.updateSelectorCache();        this.updateMetaDataCache();    }     //最终调用的更新cache的方法    protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {        String json = GsonUtils.getInstance().toJson(data);        ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());        ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);        log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);    }  

当定时任务更新配置数据之后,md5的值会改变,无论doLongPolling()方法中开启异步响应http请求之前或是60s内没有事件触发配置变更LongPollingClient返回响应其中都会调用compareChangedGroup()方法,判断一下缓存中md5的值是否被改变,其实就是针对定时任务更新配置数据时,也能返回哪个Group的数据发生了变更文章源自JAVA秀-https://www.javaxiu.com/37561.html

private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {    List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);    for (ConfigGroupEnum group : ConfigGroupEnum.values()) {        // md5,lastModifyTime        String[] params = StringUtils.split(request.getParameter(group.name()), ',');        if (params == null || params.length != 2) {            throw new SoulException("group param invalid:" + request.getParameter(group.name()));        }        String clientMd5 = params[0];        long clientModifyTime = NumberUtils.toLong(params[1]);        ConfigDataCache serverCache = CACHE.get(group.name());        // do check.        if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {            changedGroup.add(group);        }    }    return changedGroup;}

总结

Soul网关中利用HTTP长轮询实现数据同步,这也太好用了叭!文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • 后台通过Controller层暴露API给网关,网关请求后台时后台并不是立即返回响应(数据有无变化),而是hold住请求最大60秒的时间。这些被hold住的请求会加入到阻塞队列中作为内存缓存文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • 这60秒钟如果有事件直接触发的数据变化,通过DataChangedEventDispatcher分发到的HttpLongPollingDataChangedListener,则立即调用线程池在阻塞队列中遍历所有被hold住的请求,塞入响应信息并释放掉文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • 如果60秒过后依然没有数据变化,hold住的请求会被释放,且阻塞队列的对应请求对象被剔除文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • 在开启异步响应Http请求之前或是60s内没有事件触发配置变更返回响应对比缓存中md5的值和网关请求中携带的md5的值看是否是定时任务触发了缓存的变更,从而返回哪个Group的数据发生了变更文章源自JAVA秀-https://www.javaxiu.com/37561.html

Soul网关中的Http长轮询只是Http长轮询中的一种实现,很大程度上参考了Nacos配置中心的设计,在Apollo中用另一种API同样实现长轮询的机制。文章源自JAVA秀-https://www.javaxiu.com/37561.html

参考:

  • https://dromara.org/zh/projects/soul/data-sync/文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • https://dromara.org/zh/blog/soul_source_learning_08_httplongpolling_01/文章源自JAVA秀-https://www.javaxiu.com/37561.html

  • https://dromara.org/zh/blog/soul_source_learning_09_httplongpolling_02/文章源自JAVA秀-https://www.javaxiu.com/37561.html

(感谢阅读,希望对你所有帮助)文章源自JAVA秀-https://www.javaxiu.com/37561.html

来源:blog.csdn.net/qq_40378034/article/details/113927834文章源自JAVA秀-https://www.javaxiu.com/37561.html

推荐阅读• 面试官:线程安全的单例模式写法你就会几种?• 你手写过堵塞队列吗?• IDEA 搞酷点!• Sharding-Jdbc实现读写分离、分库分表,妙!最近面试BATJ,整理一份面试资料《Java面试BAT通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。获取方式:点“在看”,关注公众号并回复 Java 领取,更多内容陆续奉上。

文章有帮助的话,在看,转发吧。文章源自JAVA秀-https://www.javaxiu.com/37561.html

谢谢支持哟 (*^__^*)文章源自JAVA秀-https://www.javaxiu.com/37561.html

继续阅读
速蛙云 - 极致体验,强烈推荐!!!购买套餐就免费送各大视频网站会员!快速稳定、独家福利社、流媒体稳定解锁!速度快,全球上网、视频、游戏加速、独立IP均支持!基础套餐性价比很高!这里不多说,我一直正在使用,推荐购买:https://www.javaxiu.com/59919.html
weinxin
资源分享QQ群
本站是JAVA秀团队的技术分享社区, 会经常分享资源和教程; 分享的时代, 请别再沉默!
沙海
匿名

发表评论

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定