关于我一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。Github:https://github.com/hylinux1024微信公众号:终身开发者(angrycode)
本节开始项目的编码实现。首先我们来实现登录注册模块的相关API。本项目我们是使用前后端分离的模式,在实现登录注册功能之前,假设我们的接口是开放的,那么需要确定接口校验方案。
目录
0x00 接口校验方案
我们的目标是接口不能被抓包重复访问,并且要对客户端的可靠性进行验证。
防重放攻击可使用参数有timestamp、nonce、token 和 sign支持可信任的客户端的请求则可以考虑添加appkey和appsecret参数公共参数
1、timestamp 时间戳
单位毫秒,也可以是秒,与服务端保持一致;时间戳是无关时区的,所以客户端与服务端的时间戳是可以用来比较的;如果客户端与服务端时间戳相差比较大,则可以考虑使用服务端时间进行校准;时间戳的作用是,保证这个请求在一定时间内(例如60秒内)是有效的。有效期内的校验就需要nonce参数
2、nonce 随机数
由客户端产生的随机数,客户端每次接口请求时需要保证它是不一样的。nonce的作用是保证timestamp有效期内的请求是否是合法的。服务端接收到这个参数后,会将其保存在某个集合中。 服务端会检测这个nonce是否在该集合中出现过,如果出现过说明该请求是不合法的。每个nonce的有效期设置跟timestamp参数有关,例如可以设置为60秒。
3、token 登录态
需要登录的接口则需要一个token参数。服务端生成的token在有效期内有效,如果token过期则需要提示客户端重新登录。token的生成规则可使用随机数
token = md5(1024位的随机数)4、sign 签名或校验参数
msg = 除了timestamp、nonce、token、sign参数之外的其它排序后的参数列表和值列表 = sort(参数1=值1&参数2=值2&参数3=值3...) sign = md5(msg+token+timestamp+nonce+salt) salt = 客户端与服务端约定字符串5、appkey 和 appsecret
服务端为可信任的客户端分配appkey和appsecret参数。可由随机数或自定义的规则生成,要保证appkey和appsecret是对应的。客户端需保证appsecret不被泄露。客户端接口请求时只需带上appkey参数。appsecret则添加到sign校验参数的计算中
sign = md5(token+msg+timestamp+nonce+appsecret)结合上面的参数,一个接口请求应该类似这样
?phone=×tamp=15&nonce=34C2AF&sign=e10adc3949ba59abbe56e057f20f883e&appkey=A23CE80D服务端程序接收到请求后验证流程应该是这样的
通过appkey查询到appsecret,如果查不到则返回出错信息,否则继续;通过timestamp检查nonce是否在有效时间内是的重复请求,如果是多次重复请求,则返回出错信息,否则继续;通过请求参数构造msg并计算sign,将此参数与请求中获取到的参数进行对比,验证成功后才开始我们的业务逻辑。这样我们的一个简单实用的接口验证方案就出来了,当然可能还有其它一些好的想法,欢迎留言一起探讨学习。
0x01 show me the code
现在开始实现登录注册功能,相信这个模块走通了,之后其它模块也是依样画葫芦。
先看下模块
├── api │ ├── __init__.py │ └── auth.py ├── app.py ├── config.ini ├── datingtoday.sql ├── models.py ├── requirements.txt ├── test └── venv增加了一个api相关的文件包。还有一个config.ini,主要用于配置数据库等信息,而models.py文件是定义实体类的地方。
api/__init__.py
from flask import jsonify def make_response_ok(data=None): resp = {code: 0, msg: success} if data: resp[data] = data return jsonify(resp) def make_response_error(code, msg): resp = {code: code, msg: msg} return jsonify(resp) def validsign(func): """ 验证签名 :param func: :return: """ def decorator(): params = request.form appkey = params.get(appkey) sign = params.get(sign) csign = signature(params) if not appkey: return make_response_error(300, appkey is none.) if csign != sign: return make_response_error(500, signature is error.) return func() return decorator在__init__.py中首先定义了两个封装统一的json数据结构的的方法,主要是用到flask中的jsonify函数,它可以把一个对象转成json。
在前面我们讲了接口的验证逻辑,这一部分对参数的校验功能其实是可以通用的,所以对这个逻辑也进行了封装成validsign方法。
不错,这是一个装饰器的定义。我们希望在接口访问的方法使用装饰器,就可以进行通用的接口校验。
auth.py
这一节的重点是实现登录注册和发短信接口,因此创建一个 auth.py的文件来写跟授权登录相关的接口,这样有利于我们组织代码。我们知道要实现接口的访问路径的定义与方法直接的对应,是使用@route这个装饰器。这里我们在一个新文件中定义我们的接口,就需要用到Blueprint
A blueprint is an object that allows defining application functions without requiring an application object ahead of time. It uses the same decorators as Flask, but defers the need for an application by recording them for later registration.说白了,它的作用跟@route差不多。
由于我们把登录注册当作一个接口来实现,即用户通过短信进行登录,后端会判断该用户是否为新用户,如果是新用户则自动注册。
0x02 短信接口
首先定义接口的访问路径为
{host:port}/api/auth/sendsms 请求方法:POST 参数:phone 请求成功 { "code": 0, "data": { "code": "97532", "phone": "" }, "msg": "success" }根据接口定义我们会在auth.py中定义一个Blueprint对象用来映射我们的访问路径和方法。
bp = Blueprint("auth", __name__, url_prefix=/api/auth)短信接口的实现这里会使用到redis,将请求到的短信验证码保存在redis中,并设置过期时间。然后登录时,再进行验证。
@bp.route("/sendsms", methods=[POST], endpoint="sendsms") @validsign def send_sms(): phone = request.form.get(phone) m = re.match(pattern_phone, phone) if not m: return make_response_error(300, phone number format error.) # 这里需要修改为对接短信服务 code = 97532 key = f{phone}-{code} r.set(key, code, 60) return make_response_ok({phone: phone, code: code})注意这里的endpoint="sendsms"是必需设置,因为@validsign会修饰我们的方法,每个方法都是用一个通用的校验,方法名称会变成一样的,所以如果不设置endpoint会导致url映射失败。
0x03 登录注册接口
首先定义接口的访问路径为
{host:port}/api/auth/login 请求方法:POST 参数:phone 参数:code 请求成功 { "code": 0, "data": { "expire_time": "2019-08-10 07:34:20", "token": "5bea89727ef162d35c", "user_id": }, "msg": "success" }执行登录接口时,会先验证redis中的验证码,然后查一下授权表user_auth看看是否是新用户,最后返回用户的登录授权信息。
@bp.route("/login", methods=[POST], endpoint=login) @validsign def login(): phone = request.form.get(phone) code = request.form.get(code) key = f{phone}-{code} sms_code = r.get(key) if sms_code: sms_code = sms_code.decode() if code != sms_code: return make_response_error(503, sms code error) auth_info = UserAuth.query.filter_by(open_id=phone).first() if not auth_info: auth_info = register_by_phone(phone) else: auth_info = login_by_phone(auth_info) data = {token: auth_info.token, expired_time: auth_info.expired_time.strftime("%Y-%m-%d %H:%M:%S"), user_id: auth_info.user_basic.id} r.set(fauth_info_{auth_info.user_id}, str(data)) return make_response_ok(data)总体上逻辑还是比较清晰的,最后我们看一下app.py
from flask import Flask from api import auth, config from models import db app = Flask(__name__) # 将blueprint注册到app中 app.register_blueprint(auth.bp) # 配置app的config,将数据库信息配置好 app.config[SQLALCHEMY_TRACK_MODIFICATIONS] = False app.config["SQLALCHEMY_DATABASE_URI"] = config[DATABASE][uri] # 最好生成一个secret_key app.secret_key = 8c2c0b555e6e6cb01a5fd36dd981bcee db.init_app(app) @app.route(/) def hello_world(): return Hello World! if __name__ == __main__: app.run()配置文件config.ini
# 配置数据库链接 [DATABASE] uri = mysql+pymysql://user:[email protected]:3306/datingtoday # 配置appkey和secret [APP] appkey = 432ABZ appsecret = 1AE32B092240x04 单元测试
由于接口都需要动态计算校验码,所以单元测试是必需的。这里我使用最简单的方式,直接使用unittest模块。
例如测试发短信的业务接口,首先生成一个随机数nonce,然后计算校验码sign参数,最后调用flask中的post方法模拟接口请求。
def test_sendsms(self): import math nonce = math.floor(random.uniform(, 0)) params = {phone: , appkey: 432ABZ, timestamp: datetime.now().timestamp(), nonce: nonce} sign = signature(params) params[sign] = sign respdata = self.app.post("/api/auth/sendsms", data=params) resp = respdata.json self.assertEqual(resp[code], 0, respdata.data)如果请求成功,就认为通过测试。当然这里的逻辑还是比较简单,希望小伙伴们留言讨论。
0x05 项目地址
源码地址:https://github.com/hylinux1024/datingtoday
Flask官方地址:https://palletsprojects.com/p/flask/
注意本文会使用到mysql和redis数据库,需要自行安装。