标签 Python 下的文章

用Tornado写东西,在查询SQL时出现了这个错误:

#Python3下
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: format requires a mapping

很亦可赛艇,是一个关于格式化字符串的问题,出问题的是下面这条SQL语句:

 sqlcmd_author = "SELECT * FROM `shici_authors` WHERE `name` LIKE '%%s%'"%(name)

后面找到了解决方案,多打几个百分号:

sqlcmd_author = "SELECT * FROM shici_authors WHERE name LIKE '%%%%%s%%%%'"%(                   name)

然而还没有完,Web项目为了防止SQL注入,使用了:

#db_cur.execute(sqlcmd_author,parameter)
sqlcmd_author = "SELECT * FROM shici_authors WHERE name LIKE '%%%%%s%%%%'"
db_cur.execute(sqlcmd_author,(name))
#output:
pymysql.err.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '李白'%%'' at line 1")

我勒个去,网上也没详细的说,最后终于找到了解决方法:

sqlcmd_author = "SELECT * FROM shici_authors WHERE name LIKE %s" #不要给%s加单/双引号,pymysql会自动转义
db_cur.execute(sqlcmd_author,('%' + name + '%'))

有效命中。

Cookie

由于HTTP协议是无状态的(一旦数据交换完毕就断开连接),很难知道客户端的身份,再次连接也不知道,在这种情况下,Cookie诞生了,它是服务器留在客户端的一小段信息,用于区分客户身份,游览器每次在请求时,都会在头文件上带上这个Cookie。
演示(Web.py):

#coding=utf-8
import web
urls = ('/', 'hello') #路由
class hello:
    def GET(self):
        web.setcookie('test_cookie','hello')
        return 'Hello,world!'

if __name__ == "__main__":
    app = web.application(urls, globals())
    app.run()

Session(会话)

会话是Cookie后另一种解决方式,与Cookie最大的不同是会话存储在服务器,尽管会话依赖于Cookie(会话需要存储会话ID),但客户端除了会话ID也无法篡改其他,极大的提高了安全性。
1.

//由于Tornado没有Session模块,所以找了一些轮子,需要Redis支持
https://github.com/zs1621/tornado-redis-session
https://github.com/cole/tornado-sessions

2.目前用的

pip install pycket
pip install redis

例子:

import tornado.ioloop
import tornado.web
from pycket.session import SessionMixin

class MainHandler(tornado.web.RequestHandler,SessionMixin):
    def get(self):
        test = self.session.get('test_session')
        if test:
            self.write(test)
        else:
            self.session.set('test_session','hello')
            self.write("Hello, world")
if __name__ == "__main__":
    settings = {}
    settings['pycket'] = {}
    settings['pycket']['cookies'] = {}
    settings['pycket']['storage'] = {}
    settings['cookie_secret'] = 'f8lNNzhHTVSOyKab3MKv6A=='
    settings['pycket']['engine'] = 'redis' #数据库类型
    settings['pycket']['storage']['host'] = 'localhost' #host
    settings['pycket']['storage']['port'] = 6379 #端口
    settings['pycket']['storage']['db_sessions'] = 10 #用于会话的数据集
    settings['pycket']['storage']['db_notifications'] = 16 #用于通知的数据集
    settings['pycket']['storage']['max_connections'] = 2**64 #最大连接数
    settings['pycket']['cookies']['expires_days'] = 1 #会话过期时间
    settings['pycket']['cookies']['max_age'] = 86400 #兼容设置,同上
    print(settings)
    app = tornado.web.Application([(r'/', MainHandler),],**settings)
    app.listen(8080)
    tornado.ioloop.IOLoop.current().start()

前言

从Flask转到Tornado顺便玩一下简单的Web.py,随便写写。

安装

Pip(Python2.7):

pip install web.py

最新开发版本:

git clone git://github.com/webpy/webpy.git

Python3(实验版本,不完全的支持):

pip3 install web.py==0.40-dev1

最小的应用

#coding=utf-8
import web
urls = ('/', 'hello') #路由
class hello:
    def GET(self):
        return 'Hello,world!'

if __name__ == "__main__":
    app = web.application(urls, globals())
    app.run()

运行:python hello.py 8080

路由系统

路由

在上面的示例中,urls为路由表,路由表为一个元组,定义了对应路由处理的类。
你可以用一个正则表达式来通配一个路由:

urls = ('/','hello','/(.*)','hello') #替换上面的路由表

def GET(self,name): #增加一个参数
        return 'Hello,' + name

修改后试试访问 http://localhost:8080/me

跳转与重定向

raise web.seeother('/') #跳转到/目录
raise web.redirect('/')

这里两个跳转效果是一样的,但又有本质区别,web.seeother给游览器的是303,让游览器通过GET跳转到新的页面,web.redirect给游览器的是301,永久重定向,那这样的话对表单是极不有好的。
具体:https://www.cnblogs.com/wuguanglin/p/redirect.html

GET 与 POST

GET与POST是常用的数据交换方式,下面介绍在Web.py中如何处理这两种数据:

GET

helloGET函数中,添加一行,这会打印所有的GET参数:

print web.input()
#print web.input().get('test') 取指定参数
添加后访问 http://localhost:8080/?test=hello&test2=world
输出一个字典:<Storage {'test': u'hello', 'test2': u'world'}>

POST

POST需要我们在hello中添加一个POST函数,来处理POST请求:

def POST(self,name):
    print web.data()
    #print web.data().get('test') 取指定参数
    return 'Hello,' + name
    同GET方式输出一个字典(准确来说是是一个神奇的东西),这里也可以使用`web.input()`

获取环境信息

print web.ctx.ip #取客户端ip

你可以通过`web.ctx.`来取到其他的变量:
Request:
    - environ 又被写做. env – 包含标准WSGI环境变量的字典
    - home – 应用的http根路径(译注:可以理解为应用的起始网址,协议+站点域名+应用所在路径)例:http://example.org/admin
    - homedomain – 应用所在站点(可以理解为协议+域名) http://example.org
    - homepath – 当前应用所在的路径,例如: /admin
    - host – 主机名(域名)+用户请求的端口(如果没有的话,就是默认的80端口),例如: example.org, example.org:8080
    - ip – 用户的IP地址,例如: xxx.xxx.xxx.xxx
    - method – 所用的HTTP方法,例如: GET
    - path – 用户请求路径,它是基于当前应用的相对路径。在子应用中,匹配外部应用的那部分网址将被去掉。例如:主应用在code.py中,而子应用在admin.py中。在code.py中, 我们将/admin关联到admin.app。 在admin.py中, 将/stories关联到stories类。在 stories中, web.ctx.path就是/stories, 而非/admin/stories。形如: /articles/845
    - protocol – 所用协议,例如: https
    - query – 跟在’?’字符后面的查询字符串。如果不存在查询参数,它就是一个空字符串。例如: ?fourlegs=good&twolegs=bad
    - fullpath 可以视为 path + query – 包含查询参数的请求路径,但不包括’homepath’。例如:/articles/845?fourlegs=good&twolegs=bad

Response:
    - status – HTTP状态码(默认是’200 OK’) 401 Unauthorized 未经授权
    - headers – 包含HTTP头信息(headers)的二元组列表。
    - output – 包含响应实体的字符串。

参考:http://webpy.org/cookbook/ctx.zh-cn

自定义错误信息

if __name__ == '__main__': app = web.application(urls,globals())下添加两行:

app.notfound = notfound #404,定义由notfound函数处理
app.internalerror = internalerror #500,定义由internalerror函数处理

class Video: #生成器
def GET(self):
    web.header('Content-type','video/mp4') #设置头
    #web.header('Transfer-Encoding','chunked')
    with open('test2.mp4','rb') as f:
            data = f.read(2048) #每次读取2048字节
            while data:
                    print len(data),'-',
                    yield data
                    data = f.read(2048)

Cookie

设置一个Cookie(在GET或POST方式内):
web.setcookie(name,value,time,domain,secure)
- name    (string)名称
- value   (string)值
- time    (int,可选,如果为空仅本次会话有效)有效期,秒
- domain  (string,可选)有效域
- secure  (bool,可选)要求HTTPS
读取一个Cookie(在GET或POST方式内):
web.cookies().get(name) #空返回None
或
foo = web.cookies(cookieName=defaultValue) foo.cookieName #空返回默认值

模板

Web.py自带模板系统,但好像没有jinja2那样使起来顺手。

使用

1.一般使用
render = web.template.render('templates') #templates为模板文件夹
print render.hello(value1,value2,...) #hello为模板文件名,即`hello.html`
2.指定模板文件
render = web.template.frender(“templates/hello.html”)
return render(value1,value2,...)
3.或使用字符串
template = "$def with (name)\nHello $name"
hello = web.template.Template(template)
return hello('world')

语法

不写了:https://www.jianshu.com/p/7817641efe8d

解决中文问题

pip show web.py #查看包路径
修改web\template.py, line 1061:
return Template(open(path).read(), filename=path, **self._keywords)
为:
return Template(open(path,encoding='utf-8').read(), filename=path, **self._keywords)

表单

Web.py有一个表单模块:http://webpy.org/cookbook/forms.zh-cn

SSL支持

添加:

from web.wsgiserver import CherryPyWSGIServer
CherryPyWSGIServer.ssl_certificate = "path/to/ssl_certificate"
CherryPyWSGIServer.ssl_private_key = "path/to/ssl_private_key"

相关内容

摄像头视频流

示例

传输一个mp4流,使游览器播放:

from flask import Flask,Response
app = Flask(__name__)
def Video_stream(file):
        with open(file,'rb') as f:
                data = f.read(5120) #每次读取5120字节
                while data:
                        print len(data),'-',
                        yield data
                        data = f.read(5120)
@app.route('/')
def index():
        return Response(Video_stream('./test.mp4'),mimetype='video/mp4') #Content-Type:video/mp4
if __name__ == '__main__':
        app.run(host='0.0.0.0',port='80',debug=True)

常用流媒体标头

mp3 = "audio/mpeg"
mp4 = "video/mp4"
mp4v = "video/mp4"
mpeg = "video/mpeg"
mpg = "video/mpeg"
wav  = "audio/x-wav"
webm = "video/webm"
wma = "audio/x-ms-wma"
wmv = "video/x-ms-wmv"
wmx = "video/x-ms-wmx"

部署指南

为了适应高并发,https支持(比如gevent就不支持)等条件时,往往会加入中间件,比如Nginx,把这些流媒体交给Nginx处理,可以 减轻Flask应用应用的压力,提高效率。

人脸检测

实现请看上一篇文章:我 是 链 接

算法API

在Opencv中,一共有三种自带的识别算法:
Eigenface,FisherFace和LBPH。

#会产生0-20000的可信度评分,一般低于4000-5000都视为不错的识别结果
#Eigenface(特征脸)
cv2.face.EigenFaceRecognizer_create()
#FisherFace(LDA线性判别分析)
cv2.face.FisherFaceRecognizer_create()
#一般高于80都为不可信的识别,低于50都是不错的识别
#LBPH()
cv2.face.LBPHFaceRecognizer_create()

测试例子

face_get.py 用于截取人脸,注意新建目录face_img

#coding=utf-8
import cv2
name = raw_input("Name >")
i = 1
face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_default.xml')
camera = cv2.VideoCapture(0)
while True:
    ret, frame = camera.read()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    for (x,y,w,h) in faces:
        #cv2.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
        face_img = cv2.resize(gray[y:y+h,x:x+w],(200,200))
        print face_img
        print '--------------------------'
        cv2.imwrite('./face_img/' + name + '-' + str(i) + '.pgm',face_img)
        print x,y,w,h
        i = i + 1
    #cv2.imshow("camera", frame)
    #if cv2.waitKey(1000 / 12) & 0xff == ord("q"):
    #    break
camera.release()
#cv2.destroyAllWindows()

face_eig.py Eigenface算法示例

#coding=utf-8
import os
import cv2
import numpy as np
def load_face(path): #加载人脸数据
    X,Y,N = [],[],[]
    for r,d,f in os.walk(path):
        for file in f:
            if file.endswith("pgm"):
                img = cv2.imread(path + "/" + file,cv2.IMREAD_GRAYSCALE)
                print file
                #img = cv2.resize(img,(200,200))
            buf = file.split("-")
            N.append(buf[0])
                X.append(np.asarray(img,dtype=np.uint8))
                Y.append(int(buf[1].split(".")[0]))
    return X,Y,N #人脸数据数组,ID,名称

def face_eig(path):
    [X,Y,N] = load_face(path)
    Y = np.asarray(Y,dtype=np.int32) #将ID转为numpy数组,使其与人脸数据一一对应
    model = cv2.face.createEigenFaceRecognizer() #加载算法
    model.train(np.asarray(X),np.asarray(Y)) #导入数据
    face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_default.xml')
    camera = cv2.VideoCapture(0)
    while True:
        ret, frame = camera.read()
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)
        for (x,y,w,h) in faces:
            img = cv2.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
            face = gray[x:x+w,y:y+h] #得到人脸区域
            try:
                face = cv2.resize(face,(200,200),interpolation=cv2.INTER_LINEAR) #调整尺寸,方便使用Eigenface算法
                [id,value] = model.predict(face) #检测,返回上面对应的ID和可信度评分,Eigenface算法会产生0到0-20000 的值,一般低于4000-5000的识别都算是比较精准的
                print "ID:",id,"Value:",value,"Name:",N[id]
            except:
                continue

if __name__ == "__main__":
    face_eig("./face_img")

老物搬运。
画面比在Linux桌面运行的流畅很多,也许是Opencv的cv2.imshow()会不断产生新窗口导致的卡顿,因为我关不掉窗口,它会一直产生新的。

Camera.py

#coding=utf-8
import cv2
class Camera():
    def __init__(self,Camera_id):
        self.video = cv2.VideoCapture(Camera_id)
        self.face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_default.xml')
    def __del__(self):
        self.video.release()
    def get_img(self):
        success,image = self.video.read()
        gray = cv2.cvtColor(image,cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
        for (x,y,w,h) in faces:
            cv2.rectangle(image,(x,y),(x+w,y+h),(255,0,0),2)
        ret,jpeg = cv2.imencode('.jpg', image) #将image对象转为jpeg格式
        return jpeg.tobytes() #转字节

main.py

from flask import Flask,Response
from Camera import Camera
app = Flask(__name__)
@app.route('/')
def index():
    return '<h3 style="text-align:center"><img src="/video" width="60%"></h3>'

def Video_stream(camera):
    while True:
        img = camera.get_img()
        yield (b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + img + b'\r\n\r\n') #生成器

@app.route('/video')
def video():
    return Response(Video_stream(Camera(0)), mimetype='multipart/x-mixed-replace; boundary=frame') #合成响应,返回一个自定义标头和图片数据

if __name__ == '__main__':
    app.run(host='0.0.0.0',port=80)

一个微小的例子,检测摄像头中出现的人脸并框出来。

环境部署

sudo apt-get install python-opencv

在Opencv的项目上下载用于检测人脸的级联:

https://github.com/opencv/opencv/blob/master/data/haarcascades/
下载 haarcascade_frontalface_default.xml
(这里面还有用于检测其他的级联文件,比如眼睛)

一个示例

人脸识别中找人脸很重要,只有检测到人脸,再进行算法比对数据库中的人脸。

#coding=utf-8
import cv2
#加载级联文件,返回一个级联分类器
face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_default.xml')
#打开摄像头
camera = cv2.VideoCapture(0)
while True:
    #读一帧
    ret, frame = camera.read()
    #转为灰度图,人脸检测需要灰度图
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    #人脸检测,返回坐标
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    #参数:gray 灰度图像 scaleFactor 检测过程中每次迭代时图像的压缩率 minNeighbors 每个人脸矩形保留近邻数目的最小值
    for (x,y,w,h) in faces:
        #在原图像上画矩形
        cv2.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
    #将其显示出来
    cv2.imshow("camera", frame)
    if cv2.waitKey(1000 / 12) & 0xff == ord("q"):
        break
#关闭摄像头,销毁所有窗口
camera.release()
cv2.destroyAllWindows()

测试结果

2019-02-13-200940_1920x1080_scrot.png

性能

不得不说的是C++的性能比Python快至少一半,但我觉得由于现在计算机性能并不像以前那么弱,所以C++与Python没有什么区别,当然,像嵌入式设备这种性能不强的还是用C++比较好。

搞得这个MicroPython啊,亦可赛艇。
经过上一篇文章,已经配置好了WiFi连接和Webrepl,下面我们来尝试与ESP8266来通信,在GPIO2上我接入了一个DS18x20温度传感器,通过访问ESP8266的8267端口,来获得ESP8266上传感器的数值。

main.py:
(在ESP8266上的socket库叫usocket,json库叫ujson)

 from ds18020 import DS18020
 import usocket
 import ujson
 import esp
 host = '0.0.0.0'
 port = 8267

 ds = DS18020(2)
 ds_addr = ds.getaddr()

 s = usocket.socket()
 s.bind((host,port))
 s.listen(6)
 while True:
     client,addr = s.accept()
     print('Client IP:',addr[0])
     info = {}
     info['flash_size'] = str(esp.flash_size())
     info['flash_free'] = str(esp.freemem())
     info['flash_id'] = str(esp.flash_id())
     info['ds18x20_temp'] = str(ds.gettemp(ds_addr[0]))
     print(info)
     client.send(ujson.dumps(info))
     info = {}
     client.close()

ds18020.py
(很惭愧,只做了一点的微小工作)

 import onewire
 import ds18x20
 import time
 from machine import Pin
 class DS18020:
     def __init__(self,pin):
         self.pin = Pin(pin)
     def getaddr(self):
         ow = onewire.OneWire(self.pin)
         self.ds = ds18x20.DS18X20(ow)
         roms = self.ds.scan()
         return roms
     def gettemp(self,addr):
         self.ds.convert_temp()
         time.sleep_ms(450)
         return self.ds.read_temp(addr)

效果:
31_0.PNG
今天还是挺冷的。