分类 编程/笔记 下的文章

这次使用Python来进行Websocket通信,使用Tornado自带的Websocket库,与游览器自带的api。

目录结构

.
├── static
│   └── index.html
└── websockettest.py

Python

#coding=utf-8
import tornado.web
import tornado.websocket
import tornado.httpserver
import tornado.ioloop

client_list = [] #连接用户列表

class WebSocketHandler(tornado.websocket.WebSocketHandler):
        def open(self): #连接进入
                #print(dir(self)) #康康对象有什么
                client_list.append(self)
                pass
        def on_message(self,message): #消息事件
                print(message)
                for client in client_list: #将消息发送到所有在线的连接
                        client.write_message(message)
        def on_close(self): #关闭事件
                client_list.remove(self)
                pass
        def check_origin(self,origin): #允许跨域访问
                print(origin)
                return True
class IndexPageHandler(tornado.web.RequestHandler):
        def get(self):
                self.render('index.html')

class Application(tornado.web.Application):
        def __init__(self):
                handlers = [(r'/',IndexPageHandler),(r'/websocket',WebSocketHandler)]
                settings = {'template_path':'static'} #设置模板文件目录
                tornado.web.Application.__init__(self,handlers,**settings)

if __name__ == '__main__':
        app = Application()
        tornado.httpserver.HTTPServer(app).listen(80)
        tornado.ioloop.IOLoop.instance().start()

Html & JavaSscript

<title>WebSockets Test</title>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
<script type="text/javascript">
var ws = new WebSocket("ws://ip/websocket"); #你可以在本地保存网页测试(跨域请求)
//ws.onopen = function(){
//      ws.send('hello!');
//}
ws.onmessage = function(msg){
        $('.message').append("<p>" + msg.data + "</p>");
}
ws.onclose = function(){
        $('.message').append("<p>Connection closed</p>");
}
function sendmsg(){
        var msg = $("#msg").val();
        ws.send(msg);
}
</script>
<div class='message'></div>
<textarea id="msg"></textarea>
<button type="but" onclick="sendmsg()">Send</button>

其他

上次Nodejs的Express-ws网页测试不成功的原因是Socket.io的问题,Socket.io会向socket.io/?EIO=&transport=websocket请求连 接,只有服务端一起使用Socket.io才能正常连接,使用游览器自带访问正常连接(更新:之前不了解Socket.io的工作模式写的不对的话)。

用Tornado写东西,在查询SQL时出现了这个错误:

#Python3下
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: format requires a mapping

很亦可赛艇,是一个关于格式化字符串的问题,出问题的是下面这条SQL语句:

 sqlcmd_author = "SELECT * FROM `shici_authors` WHERE `name` LIKE '%%s%'"%(name)

后面找到了解决方案,多打几个百分号:

sqlcmd_author = "SELECT * FROM shici_authors WHERE name LIKE '%%%%%s%%%%'"%(                   name)

然而还没有完,Web项目为了防止SQL注入,使用了:

#db_cur.execute(sqlcmd_author,parameter)
sqlcmd_author = "SELECT * FROM shici_authors WHERE name LIKE '%%%%%s%%%%'"
db_cur.execute(sqlcmd_author,(name))
#output:
pymysql.err.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '李白'%%'' at line 1")

我勒个去,网上也没详细的说,最后终于找到了解决方法:

sqlcmd_author = "SELECT * FROM shici_authors WHERE name LIKE %s" #不要给%s加单/双引号,pymysql会自动转义
db_cur.execute(sqlcmd_author,('%' + name + '%'))

有效命中。

不知道大家进来有没有遇到弹窗呢,按道理弹窗只会弹一次,因为写了Cookie:

code.png

JavaScript自带的方法太麻烦,用JQuery简单些。

引用

//先引用JQuery再加载JQuery Cookie
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/jquery-cookie/1.4.1/jquery.cookie.min.js"></script>

方法

$.cookie(name,value,optional parameters)

name - Cookie名称
value - 值
optional parameters - 可选参数(过期时间,创建Cookie的路径,创建Cookie的域,HTTPS传输)

//写Cookie
$.cookie('test', '1'); //临时Cookie,会话结束后删除
$.cookie('test','1',{expires:7,path:'/',domain:'abc.com',secure:true}) //过期时间(天),创建Cookie的路径,创建Cookie的域,HTTPS传输
//读Cookie
var cookie_value = $.cookie('test')
//删除Cookie
$.cookie('test',null) //直接置空

其他

使用JavaScript自带的写法参考:https://www.cnblogs.com/endv/p/8089506.html

Cookie

由于HTTP协议是无状态的(一旦数据交换完毕就断开连接),很难知道客户端的身份,再次连接也不知道,在这种情况下,Cookie诞生了,它是服务器留在客户端的一小段信息,用于区分客户身份,游览器每次在请求时,都会在头文件上带上这个Cookie。
演示(Web.py):

#coding=utf-8
import web
urls = ('/', 'hello') #路由
class hello:
    def GET(self):
        web.setcookie('test_cookie','hello')
        return 'Hello,world!'

if __name__ == "__main__":
    app = web.application(urls, globals())
    app.run()

Session(会话)

会话是Cookie后另一种解决方式,与Cookie最大的不同是会话存储在服务器,尽管会话依赖于Cookie(会话需要存储会话ID),但客户端除了会话ID也无法篡改其他,极大的提高了安全性。
1.

//由于Tornado没有Session模块,所以找了一些轮子,需要Redis支持
https://github.com/zs1621/tornado-redis-session
https://github.com/cole/tornado-sessions

2.目前用的

pip install pycket
pip install redis

例子:

import tornado.ioloop
import tornado.web
from pycket.session import SessionMixin

class MainHandler(tornado.web.RequestHandler,SessionMixin):
    def get(self):
        test = self.session.get('test_session')
        if test:
            self.write(test)
        else:
            self.session.set('test_session','hello')
            self.write("Hello, world")
if __name__ == "__main__":
    settings = {}
    settings['pycket'] = {}
    settings['pycket']['cookies'] = {}
    settings['pycket']['storage'] = {}
    settings['cookie_secret'] = 'f8lNNzhHTVSOyKab3MKv6A=='
    settings['pycket']['engine'] = 'redis' #数据库类型
    settings['pycket']['storage']['host'] = 'localhost' #host
    settings['pycket']['storage']['port'] = 6379 #端口
    settings['pycket']['storage']['db_sessions'] = 10 #用于会话的数据集
    settings['pycket']['storage']['db_notifications'] = 16 #用于通知的数据集
    settings['pycket']['storage']['max_connections'] = 2**64 #最大连接数
    settings['pycket']['cookies']['expires_days'] = 1 #会话过期时间
    settings['pycket']['cookies']['max_age'] = 86400 #兼容设置,同上
    print(settings)
    app = tornado.web.Application([(r'/', MainHandler),],**settings)
    app.listen(8080)
    tornado.ioloop.IOLoop.current().start()

前言

从Flask转到Tornado顺便玩一下简单的Web.py,随便写写。

安装

Pip(Python2.7):

pip install web.py

最新开发版本:

git clone git://github.com/webpy/webpy.git

Python3(实验版本,不完全的支持):

pip3 install web.py==0.40-dev1

最小的应用

#coding=utf-8
import web
urls = ('/', 'hello') #路由
class hello:
    def GET(self):
        return 'Hello,world!'

if __name__ == "__main__":
    app = web.application(urls, globals())
    app.run()

运行:python hello.py 8080

路由系统

路由

在上面的示例中,urls为路由表,路由表为一个元组,定义了对应路由处理的类。
你可以用一个正则表达式来通配一个路由:

urls = ('/','hello','/(.*)','hello') #替换上面的路由表

def GET(self,name): #增加一个参数
        return 'Hello,' + name

修改后试试访问 http://localhost:8080/me

跳转与重定向

raise web.seeother('/') #跳转到/目录
raise web.redirect('/')

这里两个跳转效果是一样的,但又有本质区别,web.seeother给游览器的是303,让游览器通过GET跳转到新的页面,web.redirect给游览器的是301,永久重定向,那这样的话对表单是极不有好的。
具体:https://www.cnblogs.com/wuguanglin/p/redirect.html

GET 与 POST

GET与POST是常用的数据交换方式,下面介绍在Web.py中如何处理这两种数据:

GET

helloGET函数中,添加一行,这会打印所有的GET参数:

print web.input()
#print web.input().get('test') 取指定参数
添加后访问 http://localhost:8080/?test=hello&test2=world
输出一个字典:<Storage {'test': u'hello', 'test2': u'world'}>

POST

POST需要我们在hello中添加一个POST函数,来处理POST请求:

def POST(self,name):
    print web.data()
    #print web.data().get('test') 取指定参数
    return 'Hello,' + name
    同GET方式输出一个字典(准确来说是是一个神奇的东西),这里也可以使用`web.input()`

获取环境信息

print web.ctx.ip #取客户端ip

你可以通过`web.ctx.`来取到其他的变量:
Request:
    - environ 又被写做. env – 包含标准WSGI环境变量的字典
    - home – 应用的http根路径(译注:可以理解为应用的起始网址,协议+站点域名+应用所在路径)例:http://example.org/admin
    - homedomain – 应用所在站点(可以理解为协议+域名) http://example.org
    - homepath – 当前应用所在的路径,例如: /admin
    - host – 主机名(域名)+用户请求的端口(如果没有的话,就是默认的80端口),例如: example.org, example.org:8080
    - ip – 用户的IP地址,例如: xxx.xxx.xxx.xxx
    - method – 所用的HTTP方法,例如: GET
    - path – 用户请求路径,它是基于当前应用的相对路径。在子应用中,匹配外部应用的那部分网址将被去掉。例如:主应用在code.py中,而子应用在admin.py中。在code.py中, 我们将/admin关联到admin.app。 在admin.py中, 将/stories关联到stories类。在 stories中, web.ctx.path就是/stories, 而非/admin/stories。形如: /articles/845
    - protocol – 所用协议,例如: https
    - query – 跟在’?’字符后面的查询字符串。如果不存在查询参数,它就是一个空字符串。例如: ?fourlegs=good&twolegs=bad
    - fullpath 可以视为 path + query – 包含查询参数的请求路径,但不包括’homepath’。例如:/articles/845?fourlegs=good&twolegs=bad

Response:
    - status – HTTP状态码(默认是’200 OK’) 401 Unauthorized 未经授权
    - headers – 包含HTTP头信息(headers)的二元组列表。
    - output – 包含响应实体的字符串。

参考:http://webpy.org/cookbook/ctx.zh-cn

自定义错误信息

if __name__ == '__main__': app = web.application(urls,globals())下添加两行:

app.notfound = notfound #404,定义由notfound函数处理
app.internalerror = internalerror #500,定义由internalerror函数处理

class Video: #生成器
def GET(self):
    web.header('Content-type','video/mp4') #设置头
    #web.header('Transfer-Encoding','chunked')
    with open('test2.mp4','rb') as f:
            data = f.read(2048) #每次读取2048字节
            while data:
                    print len(data),'-',
                    yield data
                    data = f.read(2048)

Cookie

设置一个Cookie(在GET或POST方式内):
web.setcookie(name,value,time,domain,secure)
- name    (string)名称
- value   (string)值
- time    (int,可选,如果为空仅本次会话有效)有效期,秒
- domain  (string,可选)有效域
- secure  (bool,可选)要求HTTPS
读取一个Cookie(在GET或POST方式内):
web.cookies().get(name) #空返回None
或
foo = web.cookies(cookieName=defaultValue) foo.cookieName #空返回默认值

模板

Web.py自带模板系统,但好像没有jinja2那样使起来顺手。

使用

1.一般使用
render = web.template.render('templates') #templates为模板文件夹
print render.hello(value1,value2,...) #hello为模板文件名,即`hello.html`
2.指定模板文件
render = web.template.frender(“templates/hello.html”)
return render(value1,value2,...)
3.或使用字符串
template = "$def with (name)\nHello $name"
hello = web.template.Template(template)
return hello('world')

语法

不写了:https://www.jianshu.com/p/7817641efe8d

解决中文问题

pip show web.py #查看包路径
修改web\template.py, line 1061:
return Template(open(path).read(), filename=path, **self._keywords)
为:
return Template(open(path,encoding='utf-8').read(), filename=path, **self._keywords)

表单

Web.py有一个表单模块:http://webpy.org/cookbook/forms.zh-cn

SSL支持

添加:

from web.wsgiserver import CherryPyWSGIServer
CherryPyWSGIServer.ssl_certificate = "path/to/ssl_certificate"
CherryPyWSGIServer.ssl_private_key = "path/to/ssl_private_key"

相关内容

摄像头视频流

示例

传输一个mp4流,使游览器播放:

from flask import Flask,Response
app = Flask(__name__)
def Video_stream(file):
        with open(file,'rb') as f:
                data = f.read(5120) #每次读取5120字节
                while data:
                        print len(data),'-',
                        yield data
                        data = f.read(5120)
@app.route('/')
def index():
        return Response(Video_stream('./test.mp4'),mimetype='video/mp4') #Content-Type:video/mp4
if __name__ == '__main__':
        app.run(host='0.0.0.0',port='80',debug=True)

常用流媒体标头

mp3 = "audio/mpeg"
mp4 = "video/mp4"
mp4v = "video/mp4"
mpeg = "video/mpeg"
mpg = "video/mpeg"
wav  = "audio/x-wav"
webm = "video/webm"
wma = "audio/x-ms-wma"
wmv = "video/x-ms-wmv"
wmx = "video/x-ms-wmx"

部署指南

为了适应高并发,https支持(比如gevent就不支持)等条件时,往往会加入中间件,比如Nginx,把这些流媒体交给Nginx处理,可以 减轻Flask应用应用的压力,提高效率。

一个例子

test.c

#include "lua.h"
#include "lualib.h"
#include "lauxlib.h" //编译安装会获得这些库文件
static int test(lua_State* lua){
    const char *name = NULL; //指针,用于存储
    name = luaL_checkstring(lua,1); //出栈,从栈中取一个参数
    printf("Hello ",name);
    lua_pushstring(lua,name); //入栈,返回数据
    return 1; //告诉Lua返回值个数
}
//一个映射表,映射模块中的函数
static const struct luaL_Reg libtest[] = {
    {"test",test}, //Lua函数名,函数地址
    {NULL,NULL}
};
//注册函数
int luaopen_test(lua_State* lua){
    //luaL_register(lua, "hello", libhello); 在5.1(含)版本前,需要通过此函数注册模块
    //5.1后注册方式
    lua_newtable(lua);
    luaL_setfuncs(lua,libtest,0);
    return 1;
}

编译:

gcc test.c -fPIC -shared -o test.so //编译为动态链接库

调用:

Lua 5.3.5  Copyright (C) 1994-2018 Lua.org, PUC-Rio
> test = require 'test'
> test.test('jiang')
Hello jiang

一些Api

出栈,取参数:
1.png
入栈,返回
2.png

人脸检测

实现请看上一篇文章:我 是 链 接

算法API

在Opencv中,一共有三种自带的识别算法:
Eigenface,FisherFace和LBPH。

#会产生0-20000的可信度评分,一般低于4000-5000都视为不错的识别结果
#Eigenface(特征脸)
cv2.face.EigenFaceRecognizer_create()
#FisherFace(LDA线性判别分析)
cv2.face.FisherFaceRecognizer_create()
#一般高于80都为不可信的识别,低于50都是不错的识别
#LBPH()
cv2.face.LBPHFaceRecognizer_create()

测试例子

face_get.py 用于截取人脸,注意新建目录face_img

#coding=utf-8
import cv2
name = raw_input("Name >")
i = 1
face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_default.xml')
camera = cv2.VideoCapture(0)
while True:
    ret, frame = camera.read()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    for (x,y,w,h) in faces:
        #cv2.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
        face_img = cv2.resize(gray[y:y+h,x:x+w],(200,200))
        print face_img
        print '--------------------------'
        cv2.imwrite('./face_img/' + name + '-' + str(i) + '.pgm',face_img)
        print x,y,w,h
        i = i + 1
    #cv2.imshow("camera", frame)
    #if cv2.waitKey(1000 / 12) & 0xff == ord("q"):
    #    break
camera.release()
#cv2.destroyAllWindows()

face_eig.py Eigenface算法示例

#coding=utf-8
import os
import cv2
import numpy as np
def load_face(path): #加载人脸数据
    X,Y,N = [],[],[]
    for r,d,f in os.walk(path):
        for file in f:
            if file.endswith("pgm"):
                img = cv2.imread(path + "/" + file,cv2.IMREAD_GRAYSCALE)
                print file
                #img = cv2.resize(img,(200,200))
            buf = file.split("-")
            N.append(buf[0])
                X.append(np.asarray(img,dtype=np.uint8))
                Y.append(int(buf[1].split(".")[0]))
    return X,Y,N #人脸数据数组,ID,名称

def face_eig(path):
    [X,Y,N] = load_face(path)
    Y = np.asarray(Y,dtype=np.int32) #将ID转为numpy数组,使其与人脸数据一一对应
    model = cv2.face.createEigenFaceRecognizer() #加载算法
    model.train(np.asarray(X),np.asarray(Y)) #导入数据
    face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_default.xml')
    camera = cv2.VideoCapture(0)
    while True:
        ret, frame = camera.read()
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)
        for (x,y,w,h) in faces:
            img = cv2.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
            face = gray[x:x+w,y:y+h] #得到人脸区域
            try:
                face = cv2.resize(face,(200,200),interpolation=cv2.INTER_LINEAR) #调整尺寸,方便使用Eigenface算法
                [id,value] = model.predict(face) #检测,返回上面对应的ID和可信度评分,Eigenface算法会产生0到0-20000 的值,一般低于4000-5000的识别都算是比较精准的
                print "ID:",id,"Value:",value,"Name:",N[id]
            except:
                continue

if __name__ == "__main__":
    face_eig("./face_img")