lex数软工作室

长期承接软件开发,数据采集,图像处理,技术咨询 微信:lex_workshop

使用python把网页信息发送到串口设备或socket设备

如果需要把网页信息或报警信息发送到串口设备,可以使用如下方案

设计流程如下:

获得页面实时信息

方案一

使用mitm中间人攻击的方案,拦截页面请求,过滤自己需要信息,python伪码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio,json
from mitmproxy import options
from mitmproxy.tools import dump
from mitmproxy import http

class Addon(object):

def response(self,flow: http.HTTPFlow):
request_url=flow.request.url #获得url
request_method=flow.request.method #访问方法

response_status_code=flow.response.status_code #状态码
response_headers=flow.response.headers #获得访问
if 'rest/iecs/v1.0/graphql/query' in request_url and 'POST' in request_method: #过滤自己需要的页面信息
print(request_url)
try:
response_content = flow.response.content.decode('utf-8') # 访问内容
print(response_content)
res=json.loads(response_content)['data']
#解析相应的数据,并发送到串口
except Exception as e:
print(e)

async def start_proxy(port):
print('代理启动,端口号为',port)
opts = options.Options(listen_host='127.0.0.1', listen_port=port)

master = dump.DumpMaster(
opts,
with_termlog=False,
with_dumper=False,
)
master.addons.add(Addon())
await master.run()
return master

asyncio.run(start_proxy(8080))

写一个爬虫定时去采集网页对应的数据

爬虫千千万,需要您自己编写了

给串口或socket发送信息

给socket发送信息

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket

class TCPControl:
def __init__(self,ip:str,port:int):
self.tcp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.tcp_client_socket.connect((ip, port))

def sendmsg(self,msg:str):
self.tcp_client_socket.send(msg.encode("gbk"))

def close(self):
self.tcp_client_socket.close()

给串口设备发送信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import serial,time

class RelayControl:

def __init__(self,port='COM5',baudrate=9600):
self.ser=serial.Serial(port, baudrate)
while not self.ser.is_open:time.sleep(0.1)
time.sleep(0.2)
self.types={
"1": "10000",
"2": "20000",
"3": "50000"
}
print('下位机已经连上')

def _sendMsg(self,code:str):
'''
给下位机发送消息
:param code:
:return:
'''
print(code.encode().hex())
self.ser.write(code.encode("gbk"))

def alart(self,level:str,ajlxmc:str,clxx:list,afdz:str):
#BJ:3,20000,0,OK
with lock:
# BJ:1,50000,2,测试车1,测试车2,测试地点,OK
print('BJ:%s,%s,%d,%s,%s,OK' % (level,self.types[ajlxmc],len(clxx),','.join(clxx),afdz))
self._sendMsg('BJ:%s,%s,%d,%s,%s,OK' % (level,self.types[ajlxmc],len(clxx),','.join(clxx),afdz))
print('已经发送成功')
#self._printMsg(self._readMsg())

def sendHeart(self):
with lock:
now=time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
self._sendMsg("XT:%s,OK" % now)
print('心跳发送成功')

⬅️ Go back