标签 Python 下的文章

前言

我常用的网站要手机验证了。

代码

这里使用buypass的证书,最多五个子域名,这个脚本使用DNS验证。

import json
import datetime

from acme import messages
from acme import challenges
from acme import client
from acme import messages

import josepy as jose

from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import load_pem_private_key

def new_csr(domain):
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
    )
    with open(domain[0] + ".key", "wb") as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption(),
        ))
    csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, domain[0]),
    ])).add_extension(
        x509.SubjectAlternativeName([x509.DNSName(i) for i in domain]),
        critical=False,
    ).sign(key, hashes.SHA256())
    return csr.public_bytes(serialization.Encoding.PEM)

def main():
    domain = ["your_domain_list"]
    # 生成csr
    csr = new_csr(domain)
    # 加载用户秘钥
    # RSA 2048bits-4096bits
    with open("./private.key", "r") as f:
        key = load_pem_private_key(f.read().encode("utf-8"), password=None)
    # 获取jwk
    acc_key = jose.JWKRSA(key=key)
    net = client.ClientNetwork(acc_key, user_agent="acm/0.0.1")
    # 本地测试用
    # net.verify_ssl = False
    # 取得acme目录
    directory = messages.Directory.from_json(net.get("https://api.buypass.com/acme/directory").json())
    # directory = messages.Directory.from_json(net.get("https://acme-v02.api.letsencrypt.org/directory").json())

    client_acme = client.ClientV2(directory, net=net)

    # 加载账户信息
    # with open("./account.json", "r") as f:
    #     user = messages.RegistrationResource.json_loads(f.read())
    # client_acme.query_registration(user)

    # 首次注册
    regr = client_acme.new_account(
        messages.NewRegistration.from_data(
            email=("[email protected]"),
            terms_of_service_agreed=True,
        )
    )

    # 保存账户信息
    with open("./account.json", "w") as f:
        f.write(json.dumps(regr.to_json()))

    # 新建订单
    order = client_acme.new_order(csr)
    challenges_list = []
    for authz in order.authorizations:
        for i in authz.body.challenges:
            # 选取DNS验证
            if isinstance(i.chall, challenges.DNS01):
                response, validation = i.response_and_validation(client_acme.net.key)
                challenges_list.append([i, response])
                print("[DNS]", "_acme-challenge." + authz.body.identifier.value)
                print("[DNS]", validation)

    while input("ok? (y) ") != "y":
        pass

    # 通知准备验证
    for i in challenges_list:
        client_acme.answer_challenge(i[0], i[1])
    print("[INFO]", "wait...")

    # 自动轮询2分钟并取回证书
    finalized_orderr = client_acme.poll_and_finalize(order, deadline=datetime.datetime.now() + datetime.timedelta(minutes=2))
    print("[OUT]", finalized_orderr.fullchain_pem)
    with open(domain[0] + ".crt", "w") as f:
        f.write(finalized_orderr.fullchain_pem)   
    pass

if __name__ == "__main__":
    main()

JSON Web Key

使用JSON的结构来承载公钥。(RFC7517)

import json
import base64
import binascii
from cryptography.hazmat.primitives.serialization import load_pem_private_key

def b64(text):
    return base64.urlsafe_b64encode(text).decode('utf8').replace("=", "")

def get_jwk(path):
    with open(path, "r") as f:
        pri = load_pem_private_key(f.read().encode("utf-8"), password=None)
    pub = pri.public_key().public_numbers()
    e = "{:x}".format(pub.e)
    e = "0{0}".format(e) if int(e) % 2 else e
    e = b64(binascii.unhexlify(e))
    n = b64(binascii.unhexlify(hex(pub.n)[2:]))
    return {"n": n, "e": e, "kty": "RSA"}

if __name__ == "__main__":
    print(get_jwk("./private.key"))

使用了RSA的模数与指数来代表一份公钥,如果是椭圆则是基点坐标。

安装

这里直接使用包管理器提供的版本,不过建议大家使用pip来安装,会少一些坑:

(Debian/Ubuntu) apt-get install uwsgi uwsgi-plugin-python3

或使用pip安装

pip3 install uwsgi

试试看

[demo.py]
from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello_world():
    return "<p>Hello, World!</p>"

终端执行:

uwsgi --http-socket 0.0.0.0:8088 --manage-script-name --mount /=demo:app --plugin python3
(如果你是通过pip安装的,可不加 --plugin python3)
uwsgi --http-socket 0.0.0.0:8088 --manage-script-name --mount /[path]=demo:app --plugin python3

访问:

http://localhost:8088/[path]

部署

通常情况下我们的应用是按照工厂模式所编写的,使用我们需要新建一个文件来暴露出app对象:

from application import create_app
app = create_app()
if __name__ == "__main__":
    app.run()

下面来编写一个配置文件,便于配置修改:

[config.ini]

[uwsgi]
# 使用http协议
# http = 0.0.0.0:8081
# 指定工作用户(组)
uid = www-data
gid = www-data
# 主进程,由本进程派生子进程
master = true
# 工作目录
chdir = /var/application
# 插件(使用pip安装的可省略)
plugins = python3
# 入口文件
wsgi-file = app.py
# 指定入口文件的Flask对象
callable = app
# 指定uwsgi的socket路径
socket = /tmp/application.sock
# 进程数
processes = 2
# 线程数
threads = 4
# 缓冲区大小
buffer-size = 32768

配置Nginx

server {
    listen 80 default_server;
    listen [::]:80 default_server;

    root /var/www/html;

    index index.html index.htm index.nginx-debian.html;

    server_name _;

    # location = /[path] { rewrite ^ /[path]/; }
    location / { try_files $uri @uwsgi; }
    location @uwsgi {
        include uwsgi_params;
        uwsgi_pass unix:/tmp/application.sock;
    }
}

uwsgi后台运行,你可以直接在命令后加-d,但我这里是新建了个服务:

[/etc/systemd/system/uwsgi.service]

[Unit]
Description=uwsgi application

[Service]
User=www-data
Group=www-data
Type=simple
WorkingDirectory=/var/application
ExecStart=/usr/bin/uwsgi /var/application/config.ini

[Install]
WantedBy=multi-user.target

接下来就是设置开机启动了:

(sudo) systemctl enable uwsgi
(启动)
(sudo) systemctl start uwsgi

常见问题

  1. Nginx报5XX错误
    检查你的uwsgi的运行用户,务必保证你创建的socketNginx有权限读写的。
  2. uwsgino app loaded. going in full dynamic mode
    这个用pip安装的不会出现,需要添加python3插件

更多

https://uwsgi-docs.readthedocs.io/en/latest/
http://nginx.org/en/docs/http/ngx_http_uwsgi_module.html

由MaxMind提供,有ASN,国家与市三种类型,支持IPv4与IPv6,使用mmdb或CSV格式分发。
注意:自2019年12月30起,需要注册账号下载。
https://dev.maxmind.com/geoip/geoip2/geolite2/

Python的使用

安装官方提供的模块:

pip3 install geoip2

示例:

import geoip2.database
gi = geoip2.database.Reader('GeoLite2-Country.mmdb')
gn = gi.country('1.1.1.1')
#city,country,asnisp
#print(gn)
print(gn.country.names['zh-CN'])
print(gn.country.iso_code)
#输出:
澳大利亚
AU

Nginx的使用

Nginx需要编译Geoip2模块(有Geoip模块,但只适用旧版的dat格式)。
https://github.com/leev/ngx_http_geoip2_module

(http)
geoip2 /etc/nginx/geoip2/GeoLite2-Country.mmdb {
    #自动重载
    auto_reload 5m;
    $geoip2_metadata_country_build metadata build_epoch;
    #国家代码
    $geoip2_country_code default=US country iso_code;
    #国家名称
    #$geoip2_country_name country names zh-CN;
}
if($geoip2_country_code != CN) {
    deny all;
}

网上有使用PyMouse,PyKeyboard,PyUserInput(前两者的整合,不活跃)的,但发现我并不适用,Pip都装不上,所以寻着PyUserInput的Readme文件找到了Pynput这个库。
这是一篇水文章。

pip install pynput

键盘

from pynput.keyboard import Key,Controller
keyboard = Controller()

#dir(Key) 功能键
#['__class__', '__doc__', '__members__', '__module__', 'alt', 'alt_l', 'alt_r', 'backspace', 'caps_lock', 'cmd', 'cmd_r', 'ctrl', 'ctrl_l', 'ctrl_r', 'delete', 'down', 'end', 'enter', 'esc', 'f1', 'f10', 'f11', 'f12', 'f13', 'f14', 'f15', 'f16', 'f17', 'f18', 'f19', 'f2', 'f20', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'f9', 'home', 'insert', 'left', 'media_next', 'media_play_pause', 'media_previous', 'media_volume_down', 'media_volume_mute', 'media_volume_up', 'menu', 'num_lock', 'page_down', 'page_up', 'pause', 'print_screen', 'right', 'scroll_lock', 'shift', 'shift_r', 'space', 'tab', 'up']

#按下按键
keyboard.press(Key.space)
#释放按键
keyboard.release(Key.space)
#等待按下按键
with keyboard.pressed(Key.shift):
    keyboard.press('a')
    keyboard.release('a')

鼠标

from pynput.mouse import Button,Controller
mouse = Controller()
#读取位置
print(mouse.position)
#设置一个位置
mouse.position = (1926,2020)
#相对移动
mouse.move(88,-88)
#鼠标按键
mouse.press(Button.left)
mouse.release(Button.left)
#双击
mouse.click(Button.left,2)
#滚轮(向下滚动2格)
mouse.scroll(0,2)

另有监控键盘和鼠标的官方例程,不过我用不上,所以就不复制到文章里了:

https://pynput.readthedocs.io/en/latest/mouse.html#monitoring-the-mouse
https://pynput.readthedocs.io/en/latest/keyboard.html#monitoring-the-keyboard

应用

可以写一个Web宏键盘:
Screenshot_2020-07-25.jpg

Flask-Mail是Flask的邮件扩展,能让我们以简单的方式方式邮件。

安装

pip3 install flask_mail

配置

MAIL_SERVER : 服务器地址,默认为 ‘localhost’
MAIL_PORT : 服务器端口,默认为 25
MAIL_USE_TLS : 使用TLS,默认为 False
MAIL_USE_SSL : 使用SSL,默认为 False
MAIL_DEBUG : 是否打开Debug,默认为 app.debug
MAIL_USERNAME : 用户名,默认为 None
MAIL_PASSWORD : 密码(授权码),默认为 None
MAIL_DEFAULT_SENDER : 默认发送者,默认为 None
MAIL_MAX_EMAILS : 重新连接前发送邮件最大数目,默认为 None
MAIL_SUPPRESS_SEND : 启用发送(True为不发送),默认为 app.testing
MAIL_ASCII_ATTACHMENTS : 附件文件名转换为ASCII,默认为 False

示例(同步)

from flask import Flask
from flask_mail import Mail,Message
app = Flask(__name__)

#app.config['MAIL_DEBUG'] = True
app.config['MAIL_SUPPRESS_SEND'] = False
app.config['MAIL_SERVER'] = 'smtp.qq.com'
app.config['MAIL_PORT'] = 465
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = '你的邮箱'
app.config['MAIL_PASSWORD'] = '密码(授权码)'
app.config['MAIL_DEFAULT_SENDER'] = '发件人邮箱'
mail = Mail(app)

@app.route('/')
def index():
    msg = Message(subject="标题",sender="发送者邮箱",recipients=[接收者邮箱1,接收者邮箱2...])
    #发送HTML或文本信息
    msg.html = '<h1>hello</h1>'
    #msg.body = 'hello'
    #发送
    mail.send(msg)
    return 'index'

if __name__ == '__main__':
    app.run('0.0.0.0',8080)

异步解决

#省略上面的配置和路由
#(下面代码在路由中)
from threading import Thread
from flask import current_app
def send_mail(app,msg):
    #应用上下文管理器
    with app.app_context():
        mail.send(msg)
#启用线程来异步发送,传入app实例与msg对象
thr = Thread(target = send_mail, args = [app,msg])
thr.start()

在工厂模式下没有app实例怎么办,只需要在线程启动前添加这样一行来获取:

app = current_app._get_current_object()

关于current_app._get_current_object()的讨论:
https://segmentfault.com/q/1010000005865632/a-1020000005865704
Flask_Mail中文文档:
http://www.pythondoc.com/flask-mail/index.html

Select适用于类UNIX系统,可用于Socket,File等有文件描述符的动态轮询。

服务端

#Py3环境下,现在大部分迁移到Py3了
import socket
import select
#监听配置
s = socket.socket()
s.bind(('0.0.0.0',8500))
s.listen(1024)
#连接列表,包括服务器
conn_socket = [s,]
#存储可写连接
write_socket = []
while True:
    #可读列表,可写列表,错误列表,超时时间(单位秒,空为阻塞)
    read_list,write_list,error = select.select(conn_socket,write_socket,[],1)
    #轮询开始,将服务器句柄或客户句柄交给内核轮询,当所轮询句柄有变化时,会出现在read_list或error中
    #程序提交 -> 内核轮询 -> 程序操作
    print(conn_socket)
    for client in read_list:
            if client == s:
                    #接收一个新客户,存入连接列表
                    conn,addr = client.accept()
                    conn_socket.append(conn)
            else:
                    #接收客户数据
                    try:
                            msg = client.recv(1024)
                    except Exception as ex:
                            #客户端断线错误
                            conn_socket.remove(client)
                            write_socket.remove(client)
                    else:
                            #打印接收数据,并添加到可写列表
                            print(msg)
                            write_socket.append(client)
    for client in write_list:
            #向可写列表中的客户发送消息,并从可写列表移除,直到再次客户发送消息
            client.sendall(bytes(('2hello2').encode('utf-8')))
            write_socket.remove(client)
    for client in error:
            #移除发生错误的客户端
            write_socket.remove(client)
            conn_socket.remove(client)

客户端

import socket
s = socket.socket()
s.connect(('127.0.0.1', 8500))
while True:
    i = input('> ')
    s.sendall(bytes(i, encoding='utf-8'))
    rx = str(s.recv(1024),encoding='utf-8')
    print(rx)
s.close()

不足之处

  1. 最多1024的文件描述符(即1024个连接)
  2. 当连接客户增加时,轮询一次会消耗更多时间(程序空间与内核空间的copy,轮询的时间)

Select模块还有poll和epoll事件模型,一般选择epoll较优。

0716更新:

修复了客户端断开时服务器产生错误而崩溃:

import socket
import select
s = socket.socket()
s.bind(('0.0.0.0',8500))
s.listen(1024)
conn_socket = [s,]
write_socket = []
while True:
    read_list,write_list,error = select.select(conn_socket,write_socket,[])
    print('正在监听',conn_socket)
    for client in read_list:
            if client == s:
                    conn,addr = client.accept()
                    conn_socket.append(conn)
            else:
                    try:
                            msg = client.recv(1024)
                    except Exception as ex:
                            conn_socket.remove(client)
                    else:
                            if msg != b'':
                                    print(msg)
                                    if client not in write_socket:
                                            write_socket.append(client)
                            else:
                                    conn_socket.remove(client)
    for client in write_list:
            try:
                    client.sendall(bytes(('2hello2').encode('utf-8')))
                    write_socket.remove(client)
            except:
                    conn_socket.remove(client)
                    write_socket.remove(client)
    for client in error:
            write_socket.remove(client)
            conn_socket.remove(client)

共享全局变量

这是最简单的方法,也是最不线程安全的方法。
一个例子:

import time
import threading
i = 0
def test():
        global i
        while 1:
                i += 1
                #time.sleep(0.3)
def test2():
        global i
        while 1:
                i += 1
                #time.sleep(0.3)
if __name__ == '__main__':
        t1 = threading.Thread(target=test)
        t2 = threading.Thread(target=test2)
        t1.start()
        t2.start()
        for a in range(100):
                print(i)

结果好像好像出现了倒车。

OUTPUT: 42271 51380 51605 52942 53799 53823 55059 61042 63469 61928

由于线程速度太快,一个线程刚拷贝值,另一个已经赋值了,正要拷贝时,另一个线程赋值,拷贝到旧值,导致变量内容的回退。

Queue 队列

队列是线程安全的线程间的通信方式。队列有三种模式:先进先出(Queue),后进后出(LifoQueue),以及优先级(PriorityQueue)(权重) 。
一个例子:

import threading
from queue import Queue
def test(q):
        while 1:
                i = q.get() #得到压入的数据
                print(i)
                q.put(i + 1) #压入
def test2(q):
        while 1:
                i = q.get() #如果另一个线程已经取到数据,直到在它压入数据前,都会阻塞这里,除非设置了超时时间为0
                print(i)
                q.put(i + 1)
if __name__ == '__main__':
        q = Queue(1) #新建队列大小为1的一个先进先出队列
        q.put(0) #压入一个数值
        t1 = threading.Thread(target=test,args=(q,))
        t2 = threading.Thread(target=test2,args=(q,))
        t1.start()
        t2.start()

这是一个很蠢的例子,但它演示了队列的工作方式,你会看到它打印的数字是连续的。

参考

https://www.cnblogs.com/ArsenalfanInECNU/p/10022740.html

这次使用Python来进行Websocket通信,使用Tornado自带的Websocket库,与游览器自带的api。

目录结构

.
├── static
│   └── index.html
└── websockettest.py

Python

#coding=utf-8
import tornado.web
import tornado.websocket
import tornado.httpserver
import tornado.ioloop

client_list = [] #连接用户列表

class WebSocketHandler(tornado.websocket.WebSocketHandler):
        def open(self): #连接进入
                #print(dir(self)) #康康对象有什么
                client_list.append(self)
                pass
        def on_message(self,message): #消息事件
                print(message)
                for client in client_list: #将消息发送到所有在线的连接
                        client.write_message(message)
        def on_close(self): #关闭事件
                client_list.remove(self)
                pass
        def check_origin(self,origin): #允许跨域访问
                print(origin)
                return True
class IndexPageHandler(tornado.web.RequestHandler):
        def get(self):
                self.render('index.html')

class Application(tornado.web.Application):
        def __init__(self):
                handlers = [(r'/',IndexPageHandler),(r'/websocket',WebSocketHandler)]
                settings = {'template_path':'static'} #设置模板文件目录
                tornado.web.Application.__init__(self,handlers,**settings)

if __name__ == '__main__':
        app = Application()
        tornado.httpserver.HTTPServer(app).listen(80)
        tornado.ioloop.IOLoop.instance().start()

Html & JavaSscript

<title>WebSockets Test</title>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
<script type="text/javascript">
var ws = new WebSocket("ws://ip/websocket"); #你可以在本地保存网页测试(跨域请求)
//ws.onopen = function(){
//      ws.send('hello!');
//}
ws.onmessage = function(msg){
        $('.message').append("<p>" + msg.data + "</p>");
}
ws.onclose = function(){
        $('.message').append("<p>Connection closed</p>");
}
function sendmsg(){
        var msg = $("#msg").val();
        ws.send(msg);
}
</script>
<div class='message'></div>
<textarea id="msg"></textarea>
<button type="but" onclick="sendmsg()">Send</button>

其他

上次Nodejs的Express-ws网页测试不成功的原因是Socket.io的问题,Socket.io会向socket.io/?EIO=&transport=websocket请求连 接,只有服务端一起使用Socket.io才能正常连接,使用游览器自带访问正常连接(更新:之前不了解Socket.io的工作模式写的不对的话)。