lex数软工作室

长期承接软件开发,数据采集,图像处理,技术咨询 微信:lex_workshop

python的静态网站迁移方案

前一段时间接到客户想,暂时迁移静态网站,可以使用如下方案处理:

代码梳理过程如下

使用mitm暂存所有请求的response,用于返回的网页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import asyncio
from mitmproxy import options
from mitmproxy.tools import dump

from mitmproxy import http
import hashlib,pickle,os,threading,time,base64,json
from bs4 import BeautifulSoup as Soup
import requests
from urllib.parse import urljoin

mitmstore={}

def saveFile():
with open("store.pickle", 'wb') as fw:
pickle.dump(mitmstore, fw)

#自动加载模型
if os.path.exists("store.pickle"):
with open("store.pickle", 'rb') as fr:
mitmstore = pickle.load(fr)

def md5(url:str,data:str):
return hashlib.md5(url.encode('utf-8')+data.encode('utf-8')).hexdigest()

class Addon(object):

def request(self,flow: http.HTTPFlow):
request_url=flow.request.url #获得url
request_headers=flow.request.headers #获得访问消息头
request_body=flow.request.get_text() #访问提交的数据
request_method=flow.request.method #访问方法

id = md5(getUrlPath(request_url) + request_method, request_body) # 提取主键
if id in mitmstore:
print('request',id, request_url)
msg=mitmstore[id]
flow.response = http.Response.make(msg['response_status_code'], msg['response_content'],msg['response_headers'])

def response(self,flow: http.HTTPFlow):
request_url=flow.request.url #获得url
request_headers=flow.request.headers #获得访问消息头
request_body=flow.request.get_text() #访问提交的数据
request_method=flow.request.method #访问方法

response_status_code=flow.response.status_code #状态码
response_content=flow.response.content #访问内容
response_headers=flow.response.headers #获得访问

id=md5(getUrlPath(request_url) + request_method,request_body) #提取主键
if not id in mitmstore:
mitmstore[id]={'request_url':request_url,'request_headers':request_headers,'request_body': request_body,'request_method': request_method,'response_status_code': response_status_code,'response_content': response_content,'response_headers': response_headers}

async def start_proxy(port):
print('代理启动,端口号为',port)
opts = options.Options(listen_host='127.0.0.1', listen_port=port)

master = dump.DumpMaster(
opts,
with_termlog=False,
with_dumper=False,
)
master.addons.add(Addon())
await master.run()
return master

python通用爬虫用于采集要显示的静态网站

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
def getUrlPath(url:str):
return '/'.join(url.split('/')[3:])

class WebCrawlAll:
def __init__(self,firstUrl):
self.toCrawl=[]
self.toCrawl.append(firstUrl)
self.dup=set()
self.dup.add(firstUrl)
self.domain=self.getDomain(firstUrl)

def getDomain(self,url:str):
return '.'.join(url.split('/')[2].split('.')[1:])

def crawl(self,url):
print('采集链接:\t%s' % url)
aResp=requests.get(url,proxies = {'http': 'http://localhost:16767', 'https': 'http://localhost:16767'})
aDoc= Soup(aResp.text, 'html.parser')
#a标签
links=aDoc.select('a')
for link in links:
try:
href=link['href']
if href:
self.addUrl(url,href)
except:
pass

#img标签
imgs=aDoc.select('img')
for img in imgs:
try:
href=img['src']
if href:
self.addUrl(url,href)
except:
pass

#css选择
links=aDoc.select('link')
for link in links:
try:
href=link['href']
if href:
self.addUrl(url,href)
except:
pass

#css选择
links=aDoc.select('script')
for link in links:
try:
href=link['src']
if href:
self.addUrl(url,href)
except:
pass

#embed选择
links=aDoc.select('embed')
for link in links:
try:
href=link['src']
if href:
self.addUrl(url,href)
except:
pass

#css选择
links=aDoc.select('area')
for link in links:
try:
href=link['href']
if href:
self.addUrl(url,href)
except:
pass

def addUrl(self,url,href:str):
if len(href) > 0:
if href.startswith("http") and not href in self.dup:
urlDomain = self.getDomain(href)
if urlDomain == self.domain:
self.toCrawl.append(href)
self.dup.add(href)
if not href.startswith("http") and not 'javascript' in href:
hrefUrl = urljoin(url, href)
if not hrefUrl in self.dup:
urlDomain = self.getDomain(hrefUrl)
if urlDomain == self.domain:
self.toCrawl.append(hrefUrl)
self.dup.add(hrefUrl)

def crawlAll(self):
while len(self.toCrawl)>0:
url=self.toCrawl.pop()
self.crawl(url)
saveFile()
print('采集完成')

使用python把网页信息发送到串口设备或socket设备

如果需要把网页信息或报警信息发送到串口设备,可以使用如下方案

设计流程如下:

获得页面实时信息

方案一

使用mitm中间人攻击的方案,拦截页面请求,过滤自己需要信息,python伪码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio,json
from mitmproxy import options
from mitmproxy.tools import dump
from mitmproxy import http

class Addon(object):

def response(self,flow: http.HTTPFlow):
request_url=flow.request.url #获得url
request_method=flow.request.method #访问方法

response_status_code=flow.response.status_code #状态码
response_headers=flow.response.headers #获得访问
if 'rest/iecs/v1.0/graphql/query' in request_url and 'POST' in request_method: #过滤自己需要的页面信息
print(request_url)
try:
response_content = flow.response.content.decode('utf-8') # 访问内容
print(response_content)
res=json.loads(response_content)['data']
#解析相应的数据,并发送到串口
except Exception as e:
print(e)

async def start_proxy(port):
print('代理启动,端口号为',port)
opts = options.Options(listen_host='127.0.0.1', listen_port=port)

master = dump.DumpMaster(
opts,
with_termlog=False,
with_dumper=False,
)
master.addons.add(Addon())
await master.run()
return master

asyncio.run(start_proxy(8080))

写一个爬虫定时去采集网页对应的数据

爬虫千千万,需要您自己编写了

给串口或socket发送信息

给socket发送信息

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket

class TCPControl:
def __init__(self,ip:str,port:int):
self.tcp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.tcp_client_socket.connect((ip, port))

def sendmsg(self,msg:str):
self.tcp_client_socket.send(msg.encode("gbk"))

def close(self):
self.tcp_client_socket.close()

给串口设备发送信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import serial,time

class RelayControl:

def __init__(self,port='COM5',baudrate=9600):
self.ser=serial.Serial(port, baudrate)
while not self.ser.is_open:time.sleep(0.1)
time.sleep(0.2)
self.types={
"1": "10000",
"2": "20000",
"3": "50000"
}
print('下位机已经连上')

def _sendMsg(self,code:str):
'''
给下位机发送消息
:param code:
:return:
'''
print(code.encode().hex())
self.ser.write(code.encode("gbk"))

def alart(self,level:str,ajlxmc:str,clxx:list,afdz:str):
#BJ:3,20000,0,OK
with lock:
# BJ:1,50000,2,测试车1,测试车2,测试地点,OK
print('BJ:%s,%s,%d,%s,%s,OK' % (level,self.types[ajlxmc],len(clxx),','.join(clxx),afdz))
self._sendMsg('BJ:%s,%s,%d,%s,%s,OK' % (level,self.types[ajlxmc],len(clxx),','.join(clxx),afdz))
print('已经发送成功')
#self._printMsg(self._readMsg())

def sendHeart(self):
with lock:
now=time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
self._sendMsg("XT:%s,OK" % now)
print('心跳发送成功')

使用python接收本地手机号短信

如果需要长期接受短信,进行业务,可使用如下方案

设计流程如下:

购买相应的usb接收短信的硬件设备

我使用的是EC600N 4G USB口模块,各位请自行在淘宝里面购买,此设备为4G三网通设备

python相应代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#短信
import serial,time,hashlib
import binascii
import re


def hexToStr(hexStr:str):
hex = hexStr.encode('utf-8')
str_bin = binascii.unhexlify(hex)
return repr(str_bin.decode('utf-8')).replace('\\x00','').replace("'","")

def unicodeToStr(unicodeStr:str):
res=[]
for i in range(0,len(unicodeStr),4):
res.append(r'\u'+unicodeStr[i:i+4])
return ''.join(res).encode().decode("unicode_escape")

def strToUnicode(msg:str):
res=[]
for item in msg:
try:
res.append(hex(ord(item.encode('unicode_escape'))).lower().replace('0x','00'))
except:
res.append(repr(item.encode('unicode_escape')).replace("b'\\\\u",'').replace("'",''))
print(''.join(res))
return ''.join(res)

class Ec600:
def __init__(self,port='COM11',baudrate=115200):
self.ser=serial.Serial(port, baudrate, timeout=1)
while not self.ser.is_open:time.sleep(0.1)
time.sleep(1)
print('识别成功')

def readMSM(self):
'''
读取短息
'''
self._sendMsg("AT\r\n")
print(' '.join(self._readMsg()))
self._sendMsg("at+cmgf=1\r\n")
print(' '.join(self._readMsg()))
self._sendMsg("AT+CMGL=\"ALL\"\r\n")
#self._sendMsg("AT+CMGL=\"REC UNREAD\"\r\n")
msgs=self._excuteMSM(' '.join(self._readMsg()))
results=[]
localNum=self.getPhoneNum()
for line in msgs:
phoneNum,receiveTime,msg=self.extractInformation(line)
results.append((phoneNum,receiveTime,msg,localNum))
return results

def extractInformation(self,line):
phoneNum = re.findall('(?<=READ",").+?(?=")', line)[0].replace('003','')
creatTime = re.findall('\d{2}/\d{2}/\d{2},\d{2}:\d{2}:\d{2}.\d{2}', line)[0][:17]
try:
msg = unicodeToStr(re.findall('''(?<="\d{2}/\d{2}/\d{2},\d{2}:\d{2}:\d{2}.\d{2}" )[0-9A-F]+''', line)[0])
except:
msg = ''
return phoneNum,creatTime,msg

def deleteMSM(self):
'''
删除已读短信
:return:
'''
print('删除短息')
self._sendMsg("AT\r\n")
print(' '.join(self._readMsg()))
self._sendMsg("at+cmgf=1\r\n")
print(' '.join(self._readMsg()))
self._sendMsg("AT+CMGD=1,2\r\n")
print(' '.join(self._readMsg()))

def _readMsg(self) -> list:
if self.ser.readable():
res=[]
for line in self.ser.readlines():
msg=line.decode().replace("\r","").replace("\n","")
if len(msg)>0:
res.append(msg)
return res
return []

def _sendMsg(self,code:str):
self.ser.write(code.encode())

def _excuteMSM(self,msm:str):
res = re.findall('''.CMGL.+?"\\d+/\\d+/\\d+,\\d+:\\d+:\\d+.\\d+" [0-9A-F]+''', msm)
return res

def sendMsg(self,phoneNum,msg:str):
# 设置短信模式为PDU
self._sendMsg("AT\r\n")
self._sendMsg('AT+CMGF=1\r\n')
self._sendMsg('AT+CSMP=17,167,2,25\r\n')
# 设置短信编码
self._sendMsg('AT+CSCS="UCS2"\r\n')
# 手机号码 16进制unicode码
self._sendMsg('AT+CMGS="'+self._turnPhoneNum(phoneNum)+'"\r\n')
# 短信内容 16进制unicode码
print(' '.join(self._readMsg()))
self._sendMsg(strToUnicode(msg))
# 发送代码
self._sendMsg('\x1A\r\n')
print(' '.join(self._readMsg()))

def _turnPhoneNum(self,phoneNum:str):
res=[]
for i in phoneNum:
res.append('003'+i)
print(''.join(res))
return ''.join(res)

def getPhoneNum(self):
'''
获得手机号
'''
self._sendMsg("AT\r\n")
print(' '.join(self._readMsg()))
self._sendMsg("AT+CNUM\r\n")
phoneStr=' '.join(self._readMsg())
return re.findall('(?<=\+86)[0-9]{11}',phoneStr)[0]

使用上诉代码,win上查看com号,linux上查看dev/tty号,设置Ec600的port参数,调用相应方法即可读取短信

使用python定时发送gmail邮件

如果单纯使用python定时发送很简单,但是国内无法直接连接gmail邮件,需要使用vpn做桥接然后登陆发邮件

代码梳理过程如下

使用国内邮箱定时发邮件代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from email.mime.application import MIMEApplication
from smtplib import SMTP_SSL
import smtplib

class MailControl(object):
def __init__(self,mail_host,mail_user,mail_pass,mail_name):
self.mail_host=mail_host
self.mail_user=mail_user
self.mail_pass=mail_pass
self.mail_name=mail_name

def login(self):
'''
登录邮箱
:return:
'''
self.server = smtplib.SMTP(host=self.mail_host, port="587") # SMTP协议默认端口是25
#self.server = SMTP_SSL(self.mail_host, 465)
self.server.ehlo()
self.server.starttls()
self.server.login(self.mail_user, self.mail_pass)

def send(self,msg:MIMEMultipart,to_addr):
'''
:param to_addr:客户地址
:param msg:消息体
:return:
'''
return self.server.send_message(from_addr=self.mail_user,to_addrs=to_addr,msg=msg)

def quit(self):
'''
退出邮箱
:return:
'''
self.server.quit()

def createHeader(self,title,To:list,CC:list,BCC:list):
'''
创建标题
:param title:
:param To:
:param CC:
:param BCC:
:return:
'''
msg = MIMEMultipart()
msg['From'] = Header(self.mail_name, 'utf-8')
msg['To'] = Header(','.join(To), 'utf-8')
if len(CC)>0:
msg['CC'] = Header(','.join(CC), 'utf-8')
if len(BCC)>0:
msg['BCC'] = Header(','.join(BCC), 'utf-8')
msg['Subject'] = Header('%s' % title, 'utf-8')
return msg

def addContent(self,msg:MIMEMultipart,content,msgType="plain"):
msg.attach(MIMEText(content,msgType, 'utf-8')) #msgType html
return msg

def addAttach(self,msg:MIMEMultipart,fileName):
'''

:param msg:
:param fileName: 文件名
:return:
'''
apart = MIMEApplication(open(' attachment/'+fileName, 'rb').read())
apart.add_header('Content-Disposition', 'attachment', filename=fileName)
msg.attach(apart)
return msg

def addSignature(self,msg:MIMEMultipart,name:str,sendCon:str):
'''
:param msg:
:param name:
:return:
'''
with open(file="mailSignature/"+name+".htm",encoding="utf-8") as file:
content=file.read().replace("{{正文}}",sendCon)
self.addContent(msg,content,"html")
return msg


单纯使用上面代码再国内是无法发送邮件的,因为有墙的原因

给邮件设置vpn的代理

1
2
3
4
5
6
import socks
import socket
#12345为vpn的端口号,请自行查看对应的端口号
socks.setdefaultproxy(socks.HTTP, 'localhost', 12345)
socket.socket=socks.socksocket

上述代码已经可以使邮件协议走vpn的通道

gmail发送邮件的逻辑代码

1
2
3
4
5
6
7
8
9
10
control = MailControl("smtp.gmail.com", "gmail用户名", "gmail密码","gmail用户名<gmail用户名@gmail.com>")
control.login() #登录
mail="kehu@gmail.com"
msg = control.createHeader("发送邮件标题", ["%s<%s>" % ("客户邮件名", mail)], [], [])
msg = control.addContent(msg, "<p>邮件正文</p>", "html")
for attach in attachs:
#有附件添加附件
msg = control.addAttach(msg, attach)
rev = control.send(msg, [mail])
control.quit()

使用上诉代码即可用gmail给客户发送邮件

📕 end of posts 📕