短信接口防盜刷:技術(shù)方案與實(shí)戰(zhàn)實(shí)現(xiàn)
短信驗(yàn)證碼是用戶身份驗(yàn)證、交易確認(rèn)的核心手段,一旦短信接口被惡意利用(盜刷),不僅會(huì)造成企業(yè)短信費(fèi)用的巨額損失,還可能導(dǎo)致用戶信息泄露、資金被盜等嚴(yán)重后果。本文將從盜刷場(chǎng)景、防護(hù)原理、技術(shù)實(shí)現(xiàn)三個(gè)維度,詳解如何為短信接口構(gòu)建全方位的防盜刷體系,并提供可直接落地的代碼示例。
一、短信接口盜刷的常見場(chǎng)景
批量惡意調(diào)用:攻擊者通過腳本循環(huán)調(diào)用接口,批量獲取驗(yàn)證碼,消耗企業(yè)短信額度;
驗(yàn)證碼爆破:結(jié)合手機(jī)號(hào)枚舉 + 驗(yàn)證碼猜測(cè),嘗試登錄 / 支付等敏感操作;
接口參數(shù)篡改:偽造請(qǐng)求參數(shù)(如手機(jī)號(hào)、簽名),繞過基礎(chǔ)校驗(yàn);
重放攻擊:截取合法請(qǐng)求包,重復(fù)提交消耗短信資源。
二、短信接口防盜刷核心防護(hù)策略
針對(duì)上述場(chǎng)景,需從「請(qǐng)求層 - 校驗(yàn)層 - 風(fēng)控層」三層構(gòu)建防護(hù)體系:
請(qǐng)求層:接口鑒權(quán)、頻率限制、IP / 設(shè)備黑白名單;
校驗(yàn)層:驗(yàn)證碼有效期、唯一性、手機(jī)號(hào)合法性校驗(yàn);
風(fēng)控層:行為分析(如同一 IP / 設(shè)備綁定多手機(jī)號(hào))、異常行為攔截。
三、技術(shù)實(shí)現(xiàn)(Python + Flask 示例)
以下以 Python Flask 框架為例,實(shí)現(xiàn)一個(gè)具備完整防盜刷能力的短信接口,核心包含:接口鑒權(quán)、頻率限制、驗(yàn)證碼生成 / 校驗(yàn)、異常監(jiān)控等功能。
前置依賴
先安裝所需依賴:
bash
運(yùn)行
pip install flask redis pycryptodome python-dotenv
(Redis 用于存儲(chǔ)驗(yàn)證碼、頻率限制計(jì)數(shù);pycryptodome 用于接口簽名加密;dotenv 管理配置)
完整代碼實(shí)現(xiàn)
python
運(yùn)行
import os
import time
import random
import hashlib from datetime import datetime, timedelta from dotenv import load_dotenv
from flask import Flask, request, jsonify
import redis
# 加載環(huán)境變量
load_dotenv()
app = Flask(__name__)
# 初始化 Redis 連接(用于存儲(chǔ)驗(yàn)證碼、頻率限制)
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
password=os.getenv("REDIS_PASSWORD", ""),
db=0,
decode_responses=True
)
# 配置項(xiàng)(可放入 .env 文件)
SMS_EXPIRE_SECONDS = 300 # 驗(yàn)證碼有效期 5 分鐘
REQUEST_LIMIT = 5 # 單個(gè)IP/手機(jī)號(hào) 1分鐘內(nèi)最多請(qǐng)求5次
API_SECRET = os.getenv("API_SECRET", "your_secret_key") # 接口鑒權(quán)密鑰
WHITELIST_IPS = ["127.0.0.1"] # IP白名單(可擴(kuò)展為數(shù)據(jù)庫存儲(chǔ))
# ---------------------- 核心工具函數(shù) ----------------------
def generate_verification_code(length=6):
"""生成6位數(shù)字驗(yàn)證碼"""
return ''.join([str(random.randint(0, 9)) for _ in range(length)])
def check_request_frequency(key, limit=REQUEST_LIMIT, expire=60):
"""
檢查請(qǐng)求頻率(防批量調(diào)用)
:param key: 限流標(biāo)識(shí)(IP/手機(jī)號(hào))
:param limit: 限制次數(shù)
:param expire: 時(shí)間窗口(秒)
:return: True-超出限制,F(xiàn)alse-未超出
"""
current = redis_client.incr(key)
if current == 1:
redis_client.expire(key, expire)
return current > limit
def verify_api_signature(params, signature):
"""
驗(yàn)證接口簽名(防參數(shù)篡改)
:param params: 請(qǐng)求參數(shù)(字典)
:param signature: 客戶端傳入的簽名
:return: True-簽名合法,F(xiàn)alse-非法
"""
# 1. 按參數(shù)名排序
sorted_keys = sorted(params.keys())
# 2. 拼接參數(shù)(key=value)
sign_str = ''.join([f"{k}={params[k]}" for k in sorted_keys])
# 3. 拼接密鑰并加密
sign_str += API_SECRET
actual_sign = hashlib.md5(sign_str.encode()).hexdigest()
# 4. 對(duì)比簽名
return actual_sign == signature
# ---------------------- 接口實(shí)現(xiàn) ----------------------
@app.route("/send-sms", methods=["POST"])
def send_sms():
"""發(fā)送短信驗(yàn)證碼接口(核心防刷邏輯)"""
try:
# 1. 獲取請(qǐng)求參數(shù)
req_data = request.json or {}
mobile = req_data.get("mobile")
sign = req_data.get("sign")
timestamp = req_data.get("timestamp")
client_ip = request.remote_addr
# 2. 基礎(chǔ)參數(shù)校驗(yàn)
if not all([mobile, sign, timestamp]):
return jsonify({"code": 400, "msg": "參數(shù)缺失"}), 400
# 校驗(yàn)手機(jī)號(hào)格式(簡(jiǎn)單版,可擴(kuò)展正則)
if not (len(mobile) == 11 and mobile.isdigit()):
return jsonify({"code": 400, "msg": "手機(jī)號(hào)格式錯(cuò)誤"}), 400
# 校驗(yàn)時(shí)間戳(防重放攻擊,請(qǐng)求有效期5分鐘)
if abs(int(time.time()) - int(timestamp)) > 300:
return jsonify({"code": 400, "msg": "請(qǐng)求已過期"}), 400
# 3. IP白名單校驗(yàn)(白名單跳過后續(xù)限流,可選)
if client_ip not in WHITELIST_IPS:
# 4. 頻率限制(IP+手機(jī)號(hào)雙重限流)
ip_limit_key = f"sms:limit:ip:{client_ip}"
mobile_limit_key = f"sms:limit:mobile:{mobile}"
if check_request_frequency(ip_limit_key):
return jsonify({"code": 429, "msg": "IP請(qǐng)求過于頻繁,請(qǐng)稍后再試"}), 429
if check_request_frequency(mobile_limit_key):
return jsonify({"code": 429, "msg": "手機(jī)號(hào)請(qǐng)求過于頻繁,請(qǐng)稍后再試"}), 429
# 5. 接口簽名校驗(yàn)(防參數(shù)篡改)
sign_params = {"mobile": mobile, "timestamp": timestamp}
if not verify_api_signature(sign_params, sign):
return jsonify({"code": 403, "msg": "簽名驗(yàn)證失敗"}), 403
# 6. 生成并存儲(chǔ)驗(yàn)證碼(Redis)
code = generate_verification_code()
redis_key = f"sms:code:{mobile}"
# 存儲(chǔ)時(shí)拼接時(shí)間戳,便于校驗(yàn)有效期
redis_client.set(
redis_key,
f"{code}|{int(time.time())}",
ex=SMS_EXPIRE_SECONDS
)
# 7. 模擬發(fā)送短信(實(shí)際項(xiàng)目替換為短信服務(wù)商API)
# sms_provider.send(mobile, f"您的驗(yàn)證碼是:{code},有效期5分鐘")
print(f"【模擬發(fā)送】手機(jī)號(hào):{mobile},驗(yàn)證碼:{code}")
return jsonify({"code": 200, "msg": "驗(yàn)證碼發(fā)送成功"}), 200
except Exception as e:
app.logger.error(f"發(fā)送短信異常:{str(e)}")
return jsonify({"code": 500, "msg": "服務(wù)器內(nèi)部錯(cuò)誤"}), 500
@app.route("/verify-sms", methods=["POST"])
def verify_sms():
"""驗(yàn)證碼校驗(yàn)接口(防爆破)"""
try:
req_data = request.json or {}
mobile = req_data.get("mobile")
code = req_data.get("code")
if not all([mobile, code]):
return jsonify({"code": 400, "msg": "參數(shù)缺失"}), 400
# 1. 獲取存儲(chǔ)的驗(yàn)證碼
redis_key = f"sms:code:{mobile}"
stored_data = redis_client.get(redis_key)
if not stored_data:
return jsonify({"code": 400, "msg": "驗(yàn)證碼不存在或已過期"}), 400
# 2. 解析驗(yàn)證碼和生成時(shí)間
stored_code, create_time = stored_data.split("|")
create_time = int(create_time)
# 3. 校驗(yàn)驗(yàn)證碼和有效期
if code != stored_code:
# 錯(cuò)誤次數(shù)計(jì)數(shù)(防爆破,累計(jì)5次鎖定10分鐘)
error_key = f"sms:error:{mobile}"
error_count = redis_client.incr(error_key)
if error_count == 1:
redis_client.expire(error_key, 600)
if error_count >= 5:
return jsonify({"code": 403, "msg": "驗(yàn)證碼錯(cuò)誤次數(shù)過多,10分鐘后重試"}), 403
return jsonify({"code": 400, "msg": f"驗(yàn)證碼錯(cuò)誤,剩余{5-error_count}次機(jī)會(huì)"}), 400
# 4. 校驗(yàn)通過,刪除驗(yàn)證碼(防重復(fù)使用)
redis_client.delete(redis_key)
return jsonify({"code": 200, "msg": "驗(yàn)證碼校驗(yàn)成功"}), 200
except Exception as e:
app.logger.error(f"校驗(yàn)驗(yàn)證碼異常:{str(e)}")
return jsonify({"code": 500, "msg": "服務(wù)器內(nèi)部錯(cuò)誤"}), 500
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000)
代碼關(guān)鍵邏輯說明
頻率限制:通過 Redis 的 incr 計(jì)數(shù),限制單個(gè) IP / 手機(jī)號(hào) 1 分鐘內(nèi)最多請(qǐng)求 5 次,避免批量調(diào)用;
接口簽名:客戶端請(qǐng)求時(shí)需按規(guī)則生成簽名,服務(wù)端校驗(yàn)簽名合法性,防止參數(shù)被篡改;
時(shí)間戳校驗(yàn):限制請(qǐng)求有效期 5 分鐘,防止重放攻擊;
驗(yàn)證碼防護(hù):
驗(yàn)證碼存儲(chǔ)在 Redis 并設(shè)置有效期,避免永久有效;
校驗(yàn)錯(cuò)誤次數(shù)累計(jì),超過 5 次鎖定手機(jī)號(hào) 10 分鐘,防爆破;
校驗(yàn)成功后立即刪除驗(yàn)證碼,防止重復(fù)使用;
基礎(chǔ)校驗(yàn):手機(jī)號(hào)格式、參數(shù)完整性、IP 白名單(可擴(kuò)展為動(dòng)態(tài)管理)。
客戶端調(diào)用示例(Python)
python
運(yùn)行
import requests
import time
import hashlib
API_SECRET = "your_secret_key"
URL = "http://127.0.0.1:5000/send-sms"
def generate_sign(params, secret):
sorted_keys = sorted(params.keys())
sign_str = ''.join([f"{k}={params[k]}" for k in sorted_keys]) + secret
return hashlib.md5(sign_str.encode()).hexdigest()
# 構(gòu)造請(qǐng)求參數(shù)
mobile = "13800138000"
timestamp = str(int(time.time()))
sign_params = {"mobile": mobile, "timestamp": timestamp}
sign = generate_sign(sign_params, API_SECRET)
# 發(fā)送請(qǐng)求
response = requests.post(
URL,
json={
"mobile": mobile,
"timestamp": timestamp,
"sign": sign
}
)
print(response.json())
四、進(jìn)階防護(hù)建議
接入驗(yàn)證碼滑塊 / 圖形驗(yàn)證:對(duì)新設(shè)備 / 新 IP,在調(diào)用短信接口前增加人機(jī)驗(yàn)證,過濾腳本攻擊;
風(fēng)控系統(tǒng)對(duì)接:結(jié)合用戶行為(如注冊(cè)時(shí)長(zhǎng)、交易記錄)、設(shè)備指紋(IMEI、設(shè)備號(hào))、地域信息,識(shí)別異常請(qǐng)求;
短信內(nèi)容管控:統(tǒng)一短信簽名和模板,禁止自定義內(nèi)容,防止利用接口發(fā)送違規(guī)信息;
監(jiān)控告警:設(shè)置短信發(fā)送量閾值,當(dāng)某時(shí)段發(fā)送量突增時(shí),觸發(fā)告警并自動(dòng)限流;
接口日志審計(jì):記錄所有接口請(qǐng)求(IP、手機(jī)號(hào)、時(shí)間、狀態(tài)),便于事后追溯盜刷行為。
總結(jié)
短信接口防盜刷的核心是「多層防護(hù)」:請(qǐng)求層(鑒權(quán)、限流)+ 校驗(yàn)層(驗(yàn)證碼有效期、唯一性)+ 風(fēng)控層(行為分析);
技術(shù)實(shí)現(xiàn)上,Redis 是高頻限流、驗(yàn)證碼存儲(chǔ)的核心工具,接口簽名可有效防止參數(shù)篡改;
基礎(chǔ)防護(hù)(限流、簽名、有效期)可解決 80% 的盜刷問題,進(jìn)階防護(hù)(風(fēng)控、人機(jī)驗(yàn)證)可覆蓋剩余 20% 的復(fù)雜場(chǎng)景。
通過上述方案,既能有效攔截惡意盜刷行為,又能保證合法用戶的使用體驗(yàn),是企業(yè)短信接口安全的核心保障。