关于渗透测试:渗透测试之记一次红包激励模块逻辑漏洞测试总结

浸透测试之红包激励模块逻辑破绽测试总结

下图是本次App逻辑破绽测试集体总结的领导准则和测试方法,领导准则前面对应的是发现的破绽,有些破绽形容做了脱敏泛化解决,依据这些破绽根本能够反推出测试用例。这些是自己认为App逻辑破绽测试应该重点关注的,可能不全,如有补充欢送探讨(其余sql注入、XSS等其余类型破绽不是本次测试的重点,测试的对象App,另外看了后端代码都是用的orm框架个别也没有什么问题)。

背景

公司不同App产品线的红包激励模块曾好几次被破解,呈现过几次针对我司产品的autojs破解协定apk,自己作为风控部门业余做逆向的技术人员,曾逆向剖析过这些黑产app,大部分都是抓包获取用户token后间接发包获取处分,绕过看广告、做工作等环节。
而本次工作的次要目标就是主动出击,对公司外围产品的红包激励模块进行逻辑破绽测试,看是否有容易被薅羊毛的的逻辑破绽,对发现的破绽督促业务线进行整改。尽管自己始终做的逆向方向的,但也长期关注平安相干的其余畛域,对浸透测试的办法和工具等也是近朱者赤;近墨者黑。本文次要是做自己本次所做浸透的测试简要总结。

浸透测试要点总结

浸透测试和代码审计相辅相成
通常须要编写burpsuite插件实现主动sign计算,这样在批改参数重放或并发测试时不须要关怀sign计算问题,再评估完sign算法爱护强度后,能够间接看代码算法实现,而不须要齐全逆向还原能够进步测试效率。
很多破绽,如并发破绽等是能够通过代码审计一眼看进去的,不须要齐全黑盒测试,这也能够极大进步测试效率。当然也有因为对代码逻辑不熟或代码逻辑过于简单,导致看代码还不是间接黑盒测试来的快得状况。集体感觉两者是相辅相成的。
竞争/并发破绽
服务端对客户端申请都是并发解决的,而且很多服务端还有负载平衡多节点解决,这样当对数据库某字段进行加减操作(如红包发放)如果没有加锁就会导致竞争/并发破绽,分布式锁个别用数据库锁或redis锁。
做好提现最初关卡审核
本次测试的一个重点就是看是否齐全脱离app间接通过发包就能够实现提现的,教训证在有提现微信账号openid的前提下是能够全程脱离APP通过发包实现提现1元到账的。但微信的openid是跟app绑定的,不同app对应的openid不同,须要获取微信的openid,须要调⽤微信受权登录接⼝来获取,并且调⽤时会校验调⽤⽅app的签名,若app的签名与后盾配置的签名不⼀致,⽆法胜利调⽤,试过各种签名坑骗未失效,预计不是在app内校验签名的而是在微信里校验的,⽬前openid是抓包获取的。
设想一下如果能够齐全脱离app就能够全程发包获取处分并提现(如领取号手机号间接提现),那么即便限度了每个账号的提现次数那也是很恐怖的。比方开发一个在线的薅羊毛工具,而后到处流传,每个人只有扫个码或者填个手机号就能够提现一元,流传量大也很恐怖。
解决办法就是当时在app内做好各个用户行为的埋点数据和用户环境检测数据等,而后在提现重要关卡验证这些数据是否失常,不失常就回绝提现。

burpsuite脚本

对于如何应用burpsuite来进行测试,看官网帮忙文档或网上材料根本就能学会这边就不开展细说,附上解决过burpsuite插件脚本供参考。

# -*- coding: utf-8 -*-
from burp import IBurpExtender
from burp import IHttpListener
from burp import IRequestInfo
from burp import IParameter
from burp import IBurpExtenderCallbacks
from java.io import PrintWriter
import base64
from hashlib import md5
import time
HOST_FROM = "yy.xx.com"
# https://portswigger.net/burp/extender/api/index.html
class BurpExtender(IBurpExtender, IHttpListener):

    #
    # implement IBurpExtender
    #
    
    def    registerExtenderCallbacks(self, callbacks):
        # obtain an extension helpers object
        self._helpers = callbacks.getHelpers()
        self._stdout = PrintWriter(callbacks.getStdout(), True)
        
        # set our extension name
        callbacks.setExtensionName("zz resign plugin")
        
        # register ourselves as an HTTP listener
        callbacks.registerHttpListener(self)

    #
    # implement IHttpListener
    #
    
    def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):
        # only process requests
        if not messageIsRequest:
            return

        # get the HTTP service for the request
        httpService = messageInfo.getHttpService()
        
        # only process the host that is HOST_FROM
        if (HOST_FROM != httpService.getHost()):
            return

        # self._stdout.println(toolFlag)
        # self._stdout.println(IBurpExtenderCallbacks.TOOL_REPEATER)
        if (IBurpExtenderCallbacks.TOOL_REPEATER != toolFlag) or (IBurpExtenderCallbacks.TOOL_INTRUDER != toolFlag):
            return

        requestInfo = self._helpers.analyzeRequest(messageInfo)
        path = requestInfo.getUrl().getPath()
        self._stdout.println("path=%s" % path)
        if not path.startswith("/xx/"):
            return
        self.resign(messageInfo, check_sign=False)

    def resign(self, messageInfo, check_sign=False):

        requestInfo = self._helpers.analyzeRequest(messageInfo)
        method = requestInfo.getMethod()
        # self._stdout.println("method=%s" % method)

        path = requestInfo.getUrl().getPath()
        # self._stdout.println("path=%s" % path)
  
        parameters = requestInfo.getParameters()
        body_offset = requestInfo.getBodyOffset()
        url_param_dist = {}
        for param in parameters:
            if IParameter.PARAM_URL == param.getType():
                k = param.getName()
                v = param.getValue()
                v = self._helpers.urlDecode(v) # 须要转化一下有些%号没有decode,签名算法是谬误的
                self._stdout.println("%s=%s" % (k, v))
                # ...
        
        request = messageInfo.getRequest()
        body = request[body_offset:]
        body = self._helpers.bytesToString(body)
        self._stdout.println("body=%s" % body)

        s = '{}-{}'.format(method, path)
        # params
        old_sign = ""
        new_time_sign = None
        new_ts = str(int(time.time()))
        if not check_sign:
            url_param_dist["_ts"] = new_ts #更新工夫戳
            if "XX" in url_param_dist:
                s = str(url_param_dist["XX"]) + time.strftime('%Y%m%d')
                new_time_sign = md5(s.encode()).hexdigest()
                url_param_dist["sign"] = new_time_sign #更新sign
        arguments = url_param_dist
        keys = list(arguments.keys())
        keys.sort()
        for k in keys:
            if k == '_sign':
                old_sign = arguments[k]
                continue
            val = arguments[k]
            if type(val) == str or type(val) == unicode or type(val) == int:
                s += '&{}={}'.format(k, val)
            elif type(val) == list:
                for v in val:
                    s += '&{}={}'.format(k, v)
            else:
                self._stdout.println('params unknown type:{}, key:{}'.format(type(val), k))
                return

        # json body
        s += '&json={}'.format(body or '')

        digest = md5(s.encode()).hexdigest()
        new_sign = base64.urlsafe_b64encode(digest)[:-2]
        # self._stdout.println('new_sign:{}, old_sign:{}'.format(new_sign, old_sign))
        request = self._helpers.urlDecode(request)
        if not check_sign:
            new_request = self._helpers.updateParameter(request, self._helpers.buildParameter("ts", new_ts, IParameter.PARAM_URL))
            new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("_sign", new_sign, IParameter.PARAM_URL))
            if new_time_sign:
                new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("sign", new_time_sign, IParameter.PARAM_URL))
            # self._stdout.println(self._helpers.bytesToString(new_request))
            messageInfo.setRequest(new_request)
        else:
            if new_sign != old_sign:
                self._stdout.println("error! new_sign != old_sign")
            else:
                self._stdout.println("success! new_sign == old_sign")

        # messageInfo.setHttpService(self._helpers.buildHttpService(HOST_TO,
        #     httpService.getPort(), httpService.getProtocol()))

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理