2019年7月

Flask-Mail是Flask的邮件扩展,能让我们以简单的方式方式邮件。

安装

pip3 install flask_mail

配置

MAIL_SERVER : 服务器地址,默认为 ‘localhost’
MAIL_PORT : 服务器端口,默认为 25
MAIL_USE_TLS : 使用TLS,默认为 False
MAIL_USE_SSL : 使用SSL,默认为 False
MAIL_DEBUG : 是否打开Debug,默认为 app.debug
MAIL_USERNAME : 用户名,默认为 None
MAIL_PASSWORD : 密码(授权码),默认为 None
MAIL_DEFAULT_SENDER : 默认发送者,默认为 None
MAIL_MAX_EMAILS : 重新连接前发送邮件最大数目,默认为 None
MAIL_SUPPRESS_SEND : 启用发送(True为不发送),默认为 app.testing
MAIL_ASCII_ATTACHMENTS : 附件文件名转换为ASCII,默认为 False

示例(同步)

from flask import Flask
from flask_mail import Mail,Message
app = Flask(__name__)

#app.config['MAIL_DEBUG'] = True
app.config['MAIL_SUPPRESS_SEND'] = False
app.config['MAIL_SERVER'] = 'smtp.qq.com'
app.config['MAIL_PORT'] = 465
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = '你的邮箱'
app.config['MAIL_PASSWORD'] = '密码(授权码)'
app.config['MAIL_DEFAULT_SENDER'] = '发件人邮箱'
mail = Mail(app)

@app.route('/')
def index():
    msg = Message(subject="标题",sender="发送者邮箱",recipients=[接收者邮箱1,接收者邮箱2...])
    #发送HTML或文本信息
    msg.html = '<h1>hello</h1>'
    #msg.body = 'hello'
    #发送
    mail.send(msg)
    return 'index'

if __name__ == '__main__':
    app.run('0.0.0.0',8080)

异步解决

#省略上面的配置和路由
#(下面代码在路由中)
from threading import Thread
from flask import current_app
def send_mail(app,msg):
    #应用上下文管理器
    with app.app_context():
        mail.send(msg)
#启用线程来异步发送,传入app实例与msg对象
thr = Thread(target = send_mail, args = [app,msg])
thr.start()

在工厂模式下没有app实例怎么办,只需要在线程启动前添加这样一行来获取:

app = current_app._get_current_object()

关于current_app._get_current_object()的讨论:
https://segmentfault.com/q/1010000005865632/a-1020000005865704
Flask_Mail中文文档:
http://www.pythondoc.com/flask-mail/index.html

就这个模块,挺便宜的,好像销量很大。

86_0.jpg
[手机拍摄]
我买了两个,用于控制水泵,然而并不能用,排除常见问题后,开始怀疑模块的可用性,于是去谷歌搜了一圈,得到一个结论,设计有缺陷。
它有两个缺陷:

  • 本应接3.3v的CH_DP悬空
  • 使用GPIO0驱动,然而被串了标号103的10K电阻下拉,上电进入下载模式

所以有了两个解决方案:

  1. 同样,将CH_DP脚接一个1K(可不接)到3.3v
  2. 方法一:将下拉的标号103的10K电阻去除
  3. 方法二:割断GPIO0与三极管的连接,将基极连接到GPIO2驱动

我选用了方法一,修好了这个模块,唯一的不足是上电继电器会吸合一会后断开(很短暂),不过也没多大的问题。
我很好奇ESP8266上电自动引脚置高电平是怎么回事。

终于可以不再拼接字符串,而可以直接使用文件系统。

安装FS上传插件

在如下地址下载:

https://github.com/esp8266/arduino-esp8266fs-plugin/releases

将获得的压缩包解压,将ESP8266FS文件夹复制到你的Arduino安装目录的tools文件夹里,重启IDE。
注意:它好像挑版本,我使用1.6.7会报错,使用1.8.5正常上传。

一些说明

1.如何选择我的SPIFFS大小

  • 在你的IDE菜单中(工具 > Falsh Size > 4M(1M SPIFFS)),视情况选择大小。(Falsh)一般ESP01为1M大小,ESP12为4M大小。

2.是否有文件限制

  • 传,都可以传(只要有空间),点击工具菜单中ESP8266 SKetch Data Upload来上传data中的文件。

3.FS库的文档

使用

在你的项目路径会自动生成(如果没有请手动新建)data文件夹,里面存放你要上传的文件,比方说html,css之类的。
我们放入一个index.html,来试一试:

#include <ESP8266WiFi.h>
#include <WiFiClient.h>
#include <ESP8266WebServer.h>
#include "FS.h"
const char* ssid = "SSID";
const char* password = "PASSWORD";
ESP8266WebServer server(80);
void Route_Index() {
  if(SPIFFS.exists("/index.html")){ //文件存在检测
    File f = SPIFFS.open("/index.html", "r");
    if(!f){ //异常处理
      server.send(500,"text/html","Error on opening!");
    }else{
      String data = f.readString();
      f.close();
      server.send(200,"text/html",data);
    }
  }else{
   server.send(404,"text/html","Not found!");
  }
}
void setup(void){
  Serial.begin(115200);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("IP: ");
  Serial.print(WiFi.localIP());
  server.on("/",Route_Index);
  server.begin();
  if(!SPIFFS.begin()){ //初始化
    Serial.println("File system init failed!");
  }
}

就让我来试试传说中最适用于IOT的MQTT协议。

安装

虽然搜索资料很多,但大多是MQTT的使用,尽管有搭建服务器的文章,但我感觉写的不太清楚,大多数文章选择了Mosquitto(也许是Eclipse大厂出品的原因)。
经过寻找,找到了Nodejs写的mosca,但在Pi上老是安装失败,翻了翻Issues,找到了同作者写的依赖性小,轻量化的aedes。

npm install aedes --save


//最小例子
var aedes = require('aedes')();
var server = require('net').createServer(aedes.handle);
server.listen(8266);

简单使用

将所有的订阅与推送保存到sqlite3数据库中:

//Nodejs
var aedes = require('aedes')();
var colors = require('colors');
var server = require('net').createServer(aedes.handle);
var sqlite3 = require('sqlite3').verbose();
var db = new sqlite3.Database('data.db');
var port = 8266;
//更多事件查看Github:https://github.com/mcollina/aedes
aedes.on('publish',function(packet,client) {
    if (client) {
        console.log('[ Publish ] CilentID:'.green,client.id,' Qos:'.green,packet.qos,' Data:[ '.green,String(packet.payload),' ]'.green);
        db.run("INSERT INTO publish (date,client_id,topic,data) VALUES (datetime('now'),?,?,?);",[client.id,packet.topic,String(packet.payload)]);
    }
});
aedes.on('subscribe', function (subscriptions, client) {
    if (client) {
        //subscriptions懒得历遍(一般情况同时只有一个吧)
        db.run("INSERT INTO subscribe (date,client_id,topic) VALUES (datetime('now'),?,?)",[client.id,subscriptions[0].topic]);
        console.log('[ Subscribe ] SubscripTions:'.green,subscriptions[0].topic,' Qos:'.green,subscriptions[0].qos,' CilentID:'.green,client.id);
    }
});
aedes.on('unsubscribe',function(unsubscriptions,client){
    if(client){
        //同理
        console.log('[ unSubscribe ] unSubscripTions:'.green,unsubscriptions[0],' CilentID:'.green,client.id);
        db.run("DELETE FROM subscribe WHERE client_id = ? AND topic = ?;",[client.id,unsubscriptions[0]])
    }
});
server.listen(port,function(){
    console.log('[ Server ] server listening on port'.green,port)
});

//Sqlite3
CREATE TABLE "publish" ( `date` TEXT NOT NULL, `client_id` TEXT NOT NULL, `topic` TEXT NOT NULL, `data` TEXT NOT NULL );
CREATE TABLE "subscribe" ( `date` TEXT NOT NULL, `client_id` TEXT NOT NULL, `topic` TEXT NOT NULL );

//Micropython for ESP8266
>>> from umqtt.simple import MQTTClient                                                                             
>>> conn = MQTTClient('esp8266','192.168.1.64',8266)
>>> conn.connect()
0
>>> conn.publish(b'/test',b'test')
>>> conn.disconnect()

优缺点

  • 轻量化
  • 可以更好的结合业务逻辑
  • 不支持或不完全支持Qos2

Select适用于类UNIX系统,可用于Socket,File等有文件描述符的动态轮询。

服务端

#Py3环境下,现在大部分迁移到Py3了
import socket
import select
#监听配置
s = socket.socket()
s.bind(('0.0.0.0',8500))
s.listen(1024)
#连接列表,包括服务器
conn_socket = [s,]
#存储可写连接
write_socket = []
while True:
    #可读列表,可写列表,错误列表,超时时间(单位秒,空为阻塞)
    read_list,write_list,error = select.select(conn_socket,write_socket,[],1)
    #轮询开始,将服务器句柄或客户句柄交给内核轮询,当所轮询句柄有变化时,会出现在read_list或error中
    #程序提交 -> 内核轮询 -> 程序操作
    print(conn_socket)
    for client in read_list:
            if client == s:
                    #接收一个新客户,存入连接列表
                    conn,addr = client.accept()
                    conn_socket.append(conn)
            else:
                    #接收客户数据
                    try:
                            msg = client.recv(1024)
                    except Exception as ex:
                            #客户端断线错误
                            conn_socket.remove(client)
                            write_socket.remove(client)
                    else:
                            #打印接收数据,并添加到可写列表
                            print(msg)
                            write_socket.append(client)
    for client in write_list:
            #向可写列表中的客户发送消息,并从可写列表移除,直到再次客户发送消息
            client.sendall(bytes(('2hello2').encode('utf-8')))
            write_socket.remove(client)
    for client in error:
            #移除发生错误的客户端
            write_socket.remove(client)
            conn_socket.remove(client)

客户端

import socket
s = socket.socket()
s.connect(('127.0.0.1', 8500))
while True:
    i = input('> ')
    s.sendall(bytes(i, encoding='utf-8'))
    rx = str(s.recv(1024),encoding='utf-8')
    print(rx)
s.close()

不足之处

  1. 最多1024的文件描述符(即1024个连接)
  2. 当连接客户增加时,轮询一次会消耗更多时间(程序空间与内核空间的copy,轮询的时间)

Select模块还有poll和epoll事件模型,一般选择epoll较优。

0716更新:

修复了客户端断开时服务器产生错误而崩溃:

import socket
import select
s = socket.socket()
s.bind(('0.0.0.0',8500))
s.listen(1024)
conn_socket = [s,]
write_socket = []
while True:
    read_list,write_list,error = select.select(conn_socket,write_socket,[])
    print('正在监听',conn_socket)
    for client in read_list:
            if client == s:
                    conn,addr = client.accept()
                    conn_socket.append(conn)
            else:
                    try:
                            msg = client.recv(1024)
                    except Exception as ex:
                            conn_socket.remove(client)
                    else:
                            if msg != b'':
                                    print(msg)
                                    if client not in write_socket:
                                            write_socket.append(client)
                            else:
                                    conn_socket.remove(client)
    for client in write_list:
            try:
                    client.sendall(bytes(('2hello2').encode('utf-8')))
                    write_socket.remove(client)
            except:
                    conn_socket.remove(client)
                    write_socket.remove(client)
    for client in error:
            write_socket.remove(client)
            conn_socket.remove(client)

Nginx("engine x")是一款是由俄罗斯的程序设计师Igor Sysoev所开发高性能的 Web和 反向代理 服务器,也是一个 IMAP/POP3/SMTP 代理服务器。

安装

通过包管理器安装

(debain/ubuntu) sudo apt-get install nginx

编译安装

有人写了一键安装脚本,安装时间稍微长一些,如果没有特殊需求的话,一般使用包管理器提供的版本,尽管不是最新,但一般情况下很稳定。
https://lnmp.org/auto.html

基础配置

示例版本:nginx/1.14.2

重启Nginx:(debain/ubuntu) sudo servcie nginx restart

基础网站配置

所有的网站配置都在/etc/nginx/sites-available/(通过包管理器安装的,如果编译安装,请查看编译配置)中,默认有一个default配置
我们把注释删减下,得到如下的配置:

server {
    #监听端口 ipv4&ipv6
    listen 80;
    listen [::]:80;
    #网站根目录
    root /var/www/html;
    #主页
    index index.html index.htm index.nginx-debian.html;
    #服务器名称,如果有多个站点,这里填写你的域名
    server_name _;
    #路径配置块
    location / {
        #404配置,寻找文件,先寻找文件后寻找目录
        try_files $uri $uri/ =404;
    }
}

PHP配置

做站怎么能没有PHP,在配置中添加:

location ~ \.php$ {
    #注意你的PHP版本,这里配置的是php7.3-fpm
    include snippets/fastcgi-php.conf;
    fastcgi_pass unix:/run/php/php7.3-fpm.sock;
}

编译安装的这里会报错,因为你没有snippets/fastcgi-php.conf这个配置文件(这个配置中又引用了其他配置,记得全部拷贝回来), 你可以在编译前从包管理器的版本中拷贝一份,记得删除包管理器版本。

安全配置

现在的站点基本配有SSL,我们也要想办法装上一个

获取证书

你可以在这搞一张证书:https://freessl.cn/
当然也可以买一张便宜的证书,一般是Sectigo(Comodo),的比较便宜,一年5刀

SSL配置

在1.11.0之后的版本中,支持RSA与ECC双证书,填写两张证书路径,配置优先ECC算法即可。

#将你的监听端口改为:
listen 443 ssl;
listen [::]:443 ssl;
#在老版本中,启用ssl的配置为:
#ssl on;
#而不是在监听端口后加ssl
#证书/私钥配置(目录为你的证书存放目录)
ssl_certificate /etc/nginx/ssl_cert/main.crt;
ssl_certificate_key /etc/nginx/ssl_cert/main.key;
#其他ssl配置
ssl_session_timeout 5m; #会话过期时间
ssl_protocols TLSv1 TLSv1.1 TLSv1.2; #支持握手协议(通常使用TLSv1.2)
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE; #优先选择算法
ssl_prefer_server_ciphers on;  #启用服务器偏好配置

跳转设置

将http访问全部变为https访问:

#在上一个server{...}后新建:
server {
    listen 80;
    server_name yourdomain;
    rewrite ^(.*) https://$server_name$1 permanent;
}

关闭服务器token

Nginx会在响应头标识服务器版本,我们要关掉它(在/etc/nginx/nginx.conf(通过包管理器安装的,如果编译安装,请查看编译配置) 中):

#http段中
server_tokens off;

禁止访问

#禁止访问passwd路径(路径支持正则)下的所有文件
location /passwd {
    deny all;
}

优化配置

gzip压缩

使用gzip压缩来减少传输时间,将/etc/nginx/nginx.conf(通过包管理器安装的,如果编译安装,请查看编译配置)中关于gzip配置注 释删除即可。

gzip_vary on; #启用压缩标识
gzip_proxied any; #反向代理时,无条件启用压缩
gzip_comp_level 6; #压缩等级
gzip_buffers 16 8k; #缓存空间大小
gzip_http_version 1.1; #版本
gzip_types text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; #启用压缩的文件类型

使用优化模型

#配置头直接填写
worker_processes auto; #自动配置工作进程数量
#worker_cpu_affinity 01 10 01 10; #不同进程绑定不同逻辑cpu(四进程双核示例)
worker_rlimit_nofile 65535; #Nginx能打开最大的文件描述符(Linux好像为65535)
#events段中
use epoll; #使用epoll轮询处理
# kqueue | rtsig | epoll | /dev/poll | select | poll (Nginx支持的事件模型)
worker_connections 1024; 每个进程最大的连接数

其它应用

反向代理

恰饭链接

浏览目录(文件服务器)

再次恰饭链接

后续

现在你可以开始关心网站的SEO,以及前端优化了。

共享全局变量

这是最简单的方法,也是最不线程安全的方法。
一个例子:

import time
import threading
i = 0
def test():
        global i
        while 1:
                i += 1
                #time.sleep(0.3)
def test2():
        global i
        while 1:
                i += 1
                #time.sleep(0.3)
if __name__ == '__main__':
        t1 = threading.Thread(target=test)
        t2 = threading.Thread(target=test2)
        t1.start()
        t2.start()
        for a in range(100):
                print(i)

结果好像好像出现了倒车。

OUTPUT: 42271 51380 51605 52942 53799 53823 55059 61042 63469 61928

由于线程速度太快,一个线程刚拷贝值,另一个已经赋值了,正要拷贝时,另一个线程赋值,拷贝到旧值,导致变量内容的回退。

Queue 队列

队列是线程安全的线程间的通信方式。队列有三种模式:先进先出(Queue),后进后出(LifoQueue),以及优先级(PriorityQueue)(权重) 。
一个例子:

import threading
from queue import Queue
def test(q):
        while 1:
                i = q.get() #得到压入的数据
                print(i)
                q.put(i + 1) #压入
def test2(q):
        while 1:
                i = q.get() #如果另一个线程已经取到数据,直到在它压入数据前,都会阻塞这里,除非设置了超时时间为0
                print(i)
                q.put(i + 1)
if __name__ == '__main__':
        q = Queue(1) #新建队列大小为1的一个先进先出队列
        q.put(0) #压入一个数值
        t1 = threading.Thread(target=test,args=(q,))
        t2 = threading.Thread(target=test2,args=(q,))
        t1.start()
        t2.start()

这是一个很蠢的例子,但它演示了队列的工作方式,你会看到它打印的数字是连续的。

参考

https://www.cnblogs.com/ArsenalfanInECNU/p/10022740.html