弱网络下,非幂等性操作的分布式微服务处理

浑身演技 · · 1123 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

背景

在浏览器访问服务器时,网速非常慢的情况下。为了获取结果,用户常常会进行重复点击操作。这将会使得一些非幂等性操作的操作结果变得非常不可靠。

举例而言,用户进行付款操作就是一个非幂等性操作。

非幂等性,简单而言,就是一个操作是不可重复的。

方案

在用户浏览器cookie中,加上idempotent_token ,然后在各个微服务中使用拦截器拦截,并且使用分布式锁 进行全局锁定。

由于微服务是分布式的,那么将会出现一种情况是,在某种负载均衡的策略,用户在访问仓库微服务(1),并且同时访问仓库微服务(2),并且同时对库存进行修改。这种情景是合乎情理的,并且他们将会带上同一个idempotent_token进行仓库的微服务操作。这个时候是必须要使用分布式锁进行加锁操作。

原理与实现

拦截器

/**
 * 幂等的拦截器,用于处理非幂等性操作。
 * 幂等性将不予处理,直接放行
 */
public class IdempotentTokenInterceptor extends HandlerInterceptorAdapter {
    private static final Logger log = LoggerFactory.getLogger(IdempotentTokenInterceptor.class);
    public static final String IDEMPOTENT_TOKEN = "idempotent_token";
    @Resource
    private IdempotentDb<IdempotentRo> defaultIdempotentDb;
    @Value("${spring.cloud.consul.host}")
    private String consulHost;
    @Value("${spring.cloud.consul.port}")
    private int consulPort;

    /**
     * 返回幂等错误的信息
     *
     * @param response http的响应
     * @param message  返回的http message
     * @return true -> 继续向下执行;false -> 不继续向下执行,将被拦截
     */
    private boolean with(HttpServletResponse response, String message) {
        response.setStatus(HttpStatus.UNAUTHORIZED.value());
        response.setContentType("application/json");
        response.setCharacterEncoding("utf-8");
        CookieUtil.addCookie(response, IDEMPOTENT_TOKEN, UUID.randomUUID().toString().replaceAll("-", ""), 3600 * 2);
        try (PrintWriter writer = response.getWriter()) {
            writer.append(new Gson().toJson(Result.ResultBuilder.errorWith(message).build()));
            writer.flush();
        } catch (IOException e) {
            e.printStackTrace();
            log.error("cannot close response print writer");
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        return Optional.ofNullable(request.getCookies())
                .map(cookies -> Stream.of(cookies).filter(x -> IDEMPOTENT_TOKEN.equalsIgnoreCase(x.getName()))
                        .map(x -> {
                            String timeStamp = UUID.randomUUID().toString().replaceAll("-", "");
                            // 如果已经找到幂等token,判断是否幂等的值为空
                            return Optional.ofNullable(x.getValue()).map(v -> {
                                List<IdempotentRo> list = defaultIdempotentDb.findByAuthKey(v);
                                if (CollectionUtils.isEmpty(list))
                                    list = new ArrayList<>();

                                // 查找该url是否已经存在与幂等键值对之中
                                boolean hasRequested = list.stream().anyMatch(ir -> request.getMethod().equals(ir.getMethod())
                                        && request.getRequestURI().equals(ir.getAuthUrl()));
                                if (hasRequested) {
                                    log.error("already requested with idempotent token  from the URL of {} by {} method", request.getRequestURI(), request.getMethod());
                                    return with(response, "Please do not repeat the submission");
                                } else {
                                    defaultIdempotentDb.insert(IdempotentRo.IdempotentRoBuilder.build().set(v, request.getRequestURI(), request.getMethod()).create());
                                    CookieUtil.addCookie(response, IDEMPOTENT_TOKEN, UUID.randomUUID().toString().replaceAll("-", ""), 3600 * 2);
                                    return true;
                                }
                            }).orElseGet(() -> {
                                log.error("cannot find value of idempotent token  from the URL of {} by {} method", request.getRequestURI(), request.getMethod());
                                return with(response, "Please do not fake the idempotent token");
                            });

                        }).reduce((x, y) -> x && y).orElseGet(() -> {
                            log.error("cannot find idempotent token from the URL of {} by {} method", request.getRequestURI(), request.getMethod());
                            return with(response, "Please do request with idempotent token");
                        })).orElseGet(() -> {
                    log.error("cannot find cookies from the URL of {} by {} method", request.getRequestURI(), request.getMethod());
                    return with(response, "Please do not fake the request...");
                });

    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        super.postHandle(request, response, handler, modelAndView);
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        super.afterCompletion(request, response, handler, ex);

    }
}

分布式锁

public class IdempotentDistributedLock implements DistributedLock {
    private final Consul consul;
    private final Session value;
    private final SessionClient sessionClient;
    private final SessionCreatedResponse session;

    public static final String KEY = "consul_key";
    public static final Logger log= LoggerFactory.getLogger(IdempotentDistributedLock.class);

    public IdempotentDistributedLock(Consul consul, String sessionId) {
        this.consul = consul;
        // 获取摘要,作为session id,并创建会话
        this.value = ImmutableSession.builder().name(sessionId).build();
        this.sessionClient = consul.sessionClient();
        this.session = sessionClient.createSession(value);

    }

    @Override
    public void lock() {
        // 进行获取锁的操作,获取不到则将线程进行放入到缓冲队列中
        KeyValueClient keyValueClient = consul.keyValueClient();
        boolean hasAcquired=keyValueClient.acquireLock(KEY,this.value.getName().get(), this.session.getId());

        if(!hasAcquired)
            throw new AlreadyLockedException();
    }

    @Override
    @Deprecated
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        KeyValueClient keyValueClient = consul.keyValueClient();
        keyValueClient.deleteKey(KEY);
        sessionClient.destroySession(session.getId());
    }

    @Override
    @Deprecated
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

性能测试

在Chrome的slow 3g下,用户访问同个操作,主要的延迟来自于业务的处理。

好处

  • 能够有效的防止用户重复点击
  • 分布式锁实现JVM的Lock接口,用户可无学习难度的使用,并且作为分布式锁进行资源锁定
  • 在以consul的作为一致性的基础服务情况下,用户也可以有效的进行调试排查,直接将所有会话列表查询出来

局限性

  • 用户对于单一资源的锁定将会出现有时难以决断
  • 用户只能进行一次性操作,对于其他想要进行资源的操作的话,将会直接熔断,不再进行等待

未来的趋势

  • 将会解决分布式锁中,复杂多资源的锁定

引用


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:浑身演技

查看原文:弱网络下,非幂等性操作的分布式微服务处理

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077

1123 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传