目的
用网页实时监控JX8MMA7实时运行状态
实现
1、安装tornado
2、安装pustil系统监控模块
源代码
import os
import time
import psutil
import platform
import hashlib
import re
import sys
from typing import List, Dict, Any
from cachelib import SimpleCache
cache = SimpleCache()
UNIX: bool = os.name == 'posix'
SYS: str = platform.system()
class CpuConstants:
def __init__(self):
'''
初始化CPU常量(多平台)
Returns
-------
self.
'''
self.WMI = None
self.initialed: bool = False
self.cpuList: list = []
self.cpuCount: int = 0
self.cpuCore: int = 0
self.cpuThreads: int = 0
self.cpuName: str = ''
self.Update(True)
def Update(self, update: bool = False) -> None:
'''
更新cpu数据
Returns
-------
None.
'''
if UNIX: self.GetCpuConstantsUnix(update)
else: self.GetCpuConstantsWindows(update)
self.initialed: bool = True
@property
def getDict(self) -> Dict[int, str]:
'''
以字典格式获取当前cpu常量
Returns
-------
Dict[int, str]
DESCRIPTION.
'''
if not self.initialed: self.Update()
return {
'cpu_count': self.cpuCount,
'cpu_name': self.cpuName,
'cpu_core': self.cpuCore,
'cpu_threads': self.cpuThreads
}
def GetCpuConstantsUnix(self, update: bool = False) -> None:
'''
获取unix下的cpu信息
Parameters
----------
update : bool, optional
DESCRIPTION. The default is False.
Returns
-------
None
DESCRIPTION.
'''
if update or not self.initialed:
ids: list = re.findall("physical id.+", readFile('/proc/cpuinfo'))
self.cpuCount: int = len(set(ids))
self.cpuName: str = self.getCpuTypeUnix()
self.GetCpuConstantsBoth()
def InitWmi(self) -> None:
'''
初始化wmi(for windows)
Returns
-------
None
DESCRIPTION.
'''
import wmi
self.WMI = wmi.WMI()
def GetCpuConstantsBoth(self, update: bool = False) -> None:
'''
获取多平台共用的cpu信息
Parameters
----------
update : bool, optional
强制更新数据. The default is False.
Returns
-------
None
DESCRIPTION.
'''
if update or not self.initialed:
self.cpuThreads: int = psutil.cpu_count()
self.cpuCore: int = psutil.cpu_count(logical=False)
def GetCpuConstantsWindows(self, update: bool = False) -> None:
'''
获取windows平台的cpu信息
Parameters
----------
update : bool, optional
强制更新数据. The default is False.
Returns
-------
None
DESCRIPTION.
'''
if update or not self.initialed:
if self.WMI == None: self.InitWmi()
self.cpuList: list = self.WMI.Win32_Processor()
self.cpuCount: int = len(self.cpuList)
self.cpuName: str = self.cpuList[0].Name
self.GetCpuConstantsBoth()
@staticmethod
def getCpuTypeUnix() -> str:
'''
获取CPU型号(unix)
Returns
-------
str
CPU型号.
'''
cpuinfo: str = readFile('/proc/cpuinfo')
rep: str = 'model\s+name\s+:\s+(.+)'
tmp = re.search(rep,cpuinfo,re.I)
cpuType: str = ''
if tmp:
cpuType: str = tmp.groups()[0]
else:
cpuinfo = ExecShellUnix('LANG="en_US.UTF-8" && lscpu')[0]
rep = 'Model\s+name:\s+(.+)'
tmp = re.search(rep,cpuinfo,re.I)
if tmp: cpuType = tmp.groups()[0]
return cpuType
def GetCpuInfo(interval: int = 1) -> Dict[str, Any]:
'''
获取CPU信息
Parameters
----------
interval : int, optional
DESCRIPTION. The default is 1.
Returns
-------
Dict[float, list, dict]
DESCRIPTION.
'''
time.sleep(0.5)
used: float = psutil.cpu_percent(interval)
usedList: List[float] = psutil.cpu_percent(percpu=True)
return {"used": used, "used_list": usedList, **cpuConstants.getDict}
def readFile(filename: str) -> str:
'''
读取文件内容
Parameters
----------
filename : str
文件名.
Returns
-------
str
文件内容.
'''
try:
with open(filename, 'r', encoding='utf-8') as file: return file.read()
except:
pass
return ''
def GetLoadAverage() -> dict:
'''
获取服务器负载状态(多平台)
Returns
-------
dict
DESCRIPTION.
'''
try: c: list = os.getloadavg()
except: c: list = [0,0,0]
data: dict = {i: c[idx] for idx, i in enumerate(('one', 'five', 'fifteen'))}
data['max'] = psutil.cpu_count() * 2
data['limit'] = data['max']
data['safe'] = data['max'] * 0.75
return data
def GetMemInfo() -> dict:
'''
获取内存信息(多平台)
Returns
-------
dict
DESCRIPTION.
'''
if UNIX: return GetMemInfoUnix()
return GetMemInfoWindows()
def GetMemInfoUnix() -> Dict[str, int]:
'''
获取内存信息(unix)
Returns
-------
dict
DESCRIPTION.
'''
mem = psutil.virtual_memory()
memInfo: dict = {
'memTotal': ToSizeInt(mem.total, 'MB'),
'memFree': ToSizeInt(mem.free, 'MB'),
'memBuffers': ToSizeInt(mem.buffers, 'MB'),
'memCached': ToSizeInt(mem.cached, 'MB'),
}
memInfo['memRealUsed'] = \
memInfo['memTotal'] - \
memInfo['memFree'] - \
memInfo['memBuffers'] - \
memInfo['memCached']
memInfo['memUsedPercent'] = memInfo['memRealUsed'] / memInfo['memTotal'] * 100
return memInfo
def GetMemInfoWindows() -> dict:
'''
获取内存信息(windows)
Returns
-------
dict
DESCRIPTION.
'''
mem = psutil.virtual_memory()
memInfo: dict = {
'memTotal': ToSizeInt(mem.total, 'MB'),
'memFree': ToSizeInt(mem.free, 'MB'),
'memRealUsed': ToSizeInt(mem.used, 'MB'),
'menUsedPercent': mem.used / mem.total * 100
}
return memInfo
def ToSizeInt(byte: int, target: str) -> int:
'''
将字节大小转换为目标单位的大小
Parameters
----------
byte : int
int格式的字节大小(bytes size)
target : str
目标单位,str.
Returns
-------
int
转换为目标单位后的字节大小.
'''
return int(byte/1024**(('KB','MB','GB','TB').index(target) + 1))
def ToSizeString(byte: int) -> str:
'''
获取字节大小字符串
Parameters
----------
byte : int
int格式的字节大小(bytes size).
Returns
-------
str
自动转换后的大小字符串,如:6.90 GB.
'''
units: tuple = ('b','KB','MB','GB','TB')
re = lambda: '{:.2f} {}'.format(byte, u)
for u in units:
if byte < 1024: return re()
byte /= 1024
return re()
def GetDiskInfo() -> list:
'''
获取磁盘信息(多平台)
Returns
-------
list
列表.
'''
try:
if UNIX: return GetDiskInfoUnix()
return GetDiskInfoWindows()
except Exception as err:
print('获取磁盘信息异常(unix: {}):'.format(UNIX), err)
return []
def GetDiskInfoWindows() -> list:
'''
获取磁盘信息Windows
Returns
-------
diskInfo : list
列表.
'''
diskIo: list = psutil.disk_partitions()
diskInfo: list = []
for disk in diskIo:
tmp: dict = {}
try:
tmp['path'] = disk.mountpoint.replace("\","/")
usage = psutil.disk_usage(disk.mountpoint)
tmp['size'] = {
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': usage.percent
}
tmp['fstype'] = disk.fstype
tmp['inodes'] = False
diskInfo.append(tmp)
except:
pass
return diskInfo
def GetDiskInfoUnix() -> list:
'''
获取硬盘分区信息(unix)
Returns
-------
list
DESCRIPTION.
'''
temp: list = (
ExecShellUnix("df -h -P|grep '/'|grep -v tmpfs")[0]).split('\n')
tempInodes: list = (
ExecShellUnix("df -i -P|grep '/'|grep -v tmpfs")[0]).split('\n')
diskInfo: list = []
n: int = 0
cuts: list = [
'/mnt/cdrom',
'/boot',
'/boot/efi',
'/dev',
'/dev/shm',
'/run/lock',
'/run',
'/run/shm',
'/run/user'
]
for tmp in temp:
n += 1
try:
inodes: list = tempInodes[n-1].split()
disk: list = tmp.split()
if len(disk) < 5: continue
if disk[1].find('M') != -1: continue
if disk[1].find('K') != -1: continue
if len(disk[5].split('/')) > 10: continue
if disk[5] in cuts: continue
if disk[5].find('docker') != -1: continue
arr = {}
arr['path'] = disk[5]
tmp1 = [disk[1],disk[2],disk[3],disk[4]]
arr['size'] = tmp1
arr['inodes'] = [inodes[1],inodes[2],inodes[3],inodes[4]]
diskInfo.append(arr)
except Exception as ex:
print('信息获取错误:', str(ex))
continue
return diskInfo
def md5(strings: str) -> str:
'''
生成md5
Parameters
----------
strings : TYPE
要进行hash处理的字符串
Returns
-------
str[32]
hash后的字符串.
'''
m = hashlib.md5()
m.update(strings.encode('utf-8'))
return m.hexdigest()
def GetErrorInfo() -> str:
'''
获取traceback中的错误
Returns
-------
str
DESCRIPTION.
'''
import traceback
errorMsg = traceback.format_exc()
return errorMsg
def ExecShellUnix(cmdstring: str, shell=True):
'''
执行Shell命令(Unix)
Parameters
----------
cmdstring : str
DESCRIPTION.
shell : TYPE, optional
DESCRIPTION. The default is True.
Returns
-------
a : TYPE
DESCRIPTION.
e : TYPE
DESCRIPTION.
'''
a: str = ''
e: str = ''
import subprocess,tempfile
try:
rx: str = md5(cmdstring)
succ_f = tempfile.SpooledTemporaryFile(
max_size = 4096,
mode = 'wb+',
suffix = '_succ',
prefix = 'btex_' + rx ,
dir = '/dev/shm'
)
err_f = tempfile.SpooledTemporaryFile(
max_size = 4096,
mode = 'wb+',
suffix = '_err',
prefix = 'btex_' + rx ,
dir = '/dev/shm'
)
sub = subprocess.Popen(
cmdstring,
close_fds = True,
shell = shell,
bufsize = 128,
stdout = succ_f,
stderr = err_f
)
sub.wait()
err_f.seek(0)
succ_f.seek(0)
a = succ_f.read()
e = err_f.read()
if not err_f.closed: err_f.close()
if not succ_f.closed: succ_f.close()
except Exception as err:
print(err)
try:
if type(a) == bytes: a = a.decode('utf-8')
if type(e) == bytes: e = e.decode('utf-8')
except Exception as err:
print(err)
return a,e
def GetNetWork() -> dict:
'''
获取系统网络信息
Returns
-------
dict
DESCRIPTION.
'''
networkIo: list = [0,0,0,0]
cache_timeout: int = 86400
try:
networkIo = psutil.net_io_counters()[:4]
except:
pass
otime = cache.get("otime")
if not otime:
otime = time.time()
cache.set('up',networkIo[0],cache_timeout)
cache.set('down',networkIo[1],cache_timeout)
cache.set('otime',otime ,cache_timeout)
ntime = time.time()
networkInfo: dict = {'up': 0, 'down': 0}
networkInfo['upTotal'] = networkIo[0]
networkInfo['downTotal'] = networkIo[1]
try:
networkInfo['up'] = round(
float(networkIo[0] - cache.get("up")) / 1024 / (ntime - otime),
2
)
networkInfo['down'] = round(
float(networkIo[1] - cache.get("down")) / 1024 / (ntime - otime),
2
)
except:
pass
networkInfo['downPackets'] = networkIo[3]
networkInfo['upPackets'] = networkIo[2]
cache.set('up',networkIo[0],cache_timeout)
cache.set('down',networkIo[1],cache_timeout)
cache.set('otime', time.time(),cache_timeout)
return networkInfo
def GetSystemInfo() -> dict:
systemInfo: dict = {}
systemInfo['cpu'] = GetCpuInfo()
systemInfo['mem'] = GetMemInfo()
return systemInfo
def GetIoReadWrite() -> Dict[str, int]:
'''
获取系统IO读写
Returns
-------
dict
DESCRIPTION.
'''
ioDisk = psutil.disk_io_counters()
ioTotal: dict = {}
ioTotal['write'] = GetIoWrite(ioDisk.write_bytes)
ioTotal['read'] = GetIoRead(ioDisk.read_bytes)
return ioTotal
def GetIoWrite(ioWrite: int) -> int:
'''
获取IO写
Parameters
----------
ioWrite : TYPE
DESCRIPTION.
Returns
-------
int
DESCRIPTION.
'''
diskWrite: int = 0
oldWrite: int = cache.get('io_write')
if not oldWrite:
cache.set('io_write', ioWrite)
return diskWrite;
oldTime: float = cache.get('io_time')
newTime: float = time.time()
if not oldTime: oldTime = newTime
ioEnd: int = (ioWrite - oldWrite)
timeEnd: float = (time.time() - oldTime)
if ioEnd > 0:
if timeEnd < 1: timeEnd = 1
diskWrite = ioEnd / timeEnd
cache.set('io_write',ioWrite)
cache.set('io_time',newTime)
if diskWrite > 0: return int(diskWrite)
return 0
def GetIoRead(ioRead):
'''
读取IO读
Parameters
----------
ioRead : TYPE
DESCRIPTION.
Returns
-------
TYPE
DESCRIPTION.
'''
diskRead: int = 0
oldRead: int = cache.get('io_read')
if not oldRead:
cache.set('io_read',ioRead)
return diskRead;
oldTime: float = cache.get('io_time')
newTime: float = time.time()
if not oldTime: oldTime = newTime
ioEnd: int = (ioRead - oldRead)
timeEnd: float = (time.time() - oldTime)
if ioEnd > 0:
if timeEnd < 1: timeEnd = 1;
diskRead = ioEnd / timeEnd;
cache.set('io_read', ioRead)
if diskRead > 0: return int(diskRead)
return 0
def GetRegValue(key: str, subkey: str, value: str) -> Any:
'''
获取系统注册表信息
Parameters
----------
key : str
类型.
subkey : str
路径.
value : str
key.
Returns
-------
value : Any
DESCRIPTION.
'''
import winreg
key = getattr(winreg, key)
handle = winreg.OpenKey(key, subkey)
(value, type) = winreg.QueryValueEx(handle, value)
return value
def GetSystemVersion() -> str:
'''
获取操作系统版本(多平台)
Returns
-------
str
DESCRIPTION.
'''
if UNIX: return GetSystemVersionUnix()
return GetSystemVersionWindows()
def GetSystemVersionWindows() -> str:
'''
获取操作系统版本(windows)
Returns
-------
str
DESCRIPTION.
'''
try:
import platform
bit: str = 'x86';
if 'PROGRAMFILES(X86)' in os.environ: bit = 'x64'
def get(key: str):
return GetRegValue(
"HKEY_LOCAL_MACHINE",
"SOFTWARE\Microsoft\Windows NT\CurrentVersion",
key
)
osName = get('ProductName')
build = get('CurrentBuildNumber')
version: str = '{} (build {}) {} (Py{})'.format(
osName, build, bit, platform.python_version())
return version
except Exception as ex:
print('获取系统版本失败,错误:' + str(ex))
return '未知系统版本.'
def GetSystemVersionUnix() -> str:
'''
获取系统版本(unix)
Returns
-------
str
系统版本.
'''
try:
version: str = readFile('/etc/redhat-release')
if not version:
version = readFile(
'/etc/issue'
).strip().split("\n")[0].replace('\n','').replace('\l','').strip()
else:
version = version.replace(
'release ',''
).replace('Linux','').replace('(Core)','').strip()
v = sys.version_info
return version + '(Py {}.{}.{})'.format(v.major, v.minor, v.micro)
except Exception as err:
print('获取系统版本失败,错误:', err)
return '未知系统版本.'
def GetBootTime() -> dict:
'''
获取当前系统启动时间
Returns
-------
dict
DESCRIPTION.
'''
bootTime: float = psutil.boot_time()
return {
'timestamp': bootTime,
'runtime': time.time() - bootTime,
'datetime': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
}
def GetCpuConstants() -> dict:
'''
获取CPU常量信息
Parameters
----------
cpuConstants : CpuConstants
DESCRIPTION.
Returns
-------
dict
DESCRIPTION.
'''
return cpuConstants.getDict
def GetFullSystemData() -> dict:
'''
获取完全的系统信息
Returns
-------
dict
DESCRIPTION.
'''
systemData: dict = {
**GetSystemInfo(),
'network': { **GetNetWork() },
'io': { **GetIoReadWrite() },
'boot': { **GetBootTime() },
'time': time.time()
}
return systemData
cpuConstants = CpuConstants()
网页文件:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.1.0/css/bootstrap.min.css" integrity="sha384-9gVQ4dYFwwWSjIDZnLEWnxCjeSWFphJiwGPXr1jddIhOegiu1FwO5qRGvFXOdJZ4" crossorigin="anonymous">
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.7.1/Chart.min.js"></script>
<title>CPU内存曲线监控</title>
</head>
<body>
<div class="container">
<h1 class="text-center">米尔MYDJX8使用监控系统</h1>
</div>
<div class="container">
<div class="row">
<div class="col-sm-6">
<h2 class="text-center">实时监控曲线图</h2>
<canvas id="lines-graph"></canvas>
</div>
<div class="col-sm-6">
<h2 class="text-center">cpu实时核心占用率</h2>
<canvas id="bars-graph"></canvas>
</div>
</div>
<div class="row">
<div class="col-sm-6">
<h2 class="text-center">内存占用曲线图</h2>
<canvas id="lines-ram"></canvas>
</div>
</div>
</div>
<div id="cpuname" style="height:50px;overflow:auto;"></div>
<div id="cpu_used" style="height:50px;overflow:auto;"></div>
<div id="contents" style="height:500px;overflow:auto;"></div>
<div>
<textarea id="msg"></textarea>
<a href="javascript:;" onclick="sendMsg()">发送</a>
</div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.7.2/Chart.bundle.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.3/jquery.min.js" integrity="sha512-STof4xm1wgkfm7heWqFJVn58Hm3EtS31XFaagaa8VMReCXAkQnJZ+jEy8PCC/iT18dFy95WcExNHFTqLyp72eQ==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
<script type="text/javascript">
var ws = new WebSocket("ws://192.168.3.77:9000/chat");
var data_length = 0;
var timeFormat = 'MM/DD/YYYY HH:mm';
function getthistime()
{
let myDate = new Date();
let M = myDate.getMinutes();
let S = myDate.getSeconds();
return M+":"+S
}
ws.onmessage = function(e) {
console.log(e.data)
try{
var str_json = e.data.replace(new RegExp('\'',"gm"), '"' )
var obj = JSON.parse(str_json);
var element=document.getElementById("cpuname");
element.innerHTML="CPU型号:" + obj.cpu.cpu_name;
let _labels = getthistime();
if(obj.hasOwnProperty("cpu"))
{
if(data_length<=20)
{
lineChartData.labels.push(_labels);
lineRAMChartData.labels.push(_labels);
lineChartData.datasets[0].data.push(obj.cpu.used);
lineRAMChartData.datasets[0].data.push(obj.mem.menUsedPercent);
data_length = data_length +1;
}
else
{
lineChartData.labels.shift();
lineRAMData.labels.shift();
lineChartData.labels.push(_labels);
lineRAMChartData.labels.push(_labels);
lineChartData.datasets[0].data.shift();
lineChartData.datasets[0].data.push(obj.cpu.used);
lineRAMChartData.datasets[0].data.shift();
lineRAMChartData.datasets[0].data.push(obj.mem.menUsedPercent);
}
LineChart.update();
cpu_threads = obj.cpu.cpu_threads;
barChartData.labels = [];
barChartData.datasets[0].backgroundColor = [];
for(i=0;i<cpu_threads; i++)
{
barChartData.datasets[0].backgroundColor.push("rgba(75,192,192,1)")
barChartData.labels.push(i);
}
barChartData.datasets[0].data=[];
barChartData.datasets[0].data = obj.cpu.used_list;
BarChart.update();
LineRAMChart.update();
}
} catch(error) {
console.log("出错:" + error.message);
}
}
var lineChartData = {
labels: [],
datasets: [
{
label: "总CPU使用率",
fill: false,
lineTension: 0.1,
backgroundColor: "rgba(75,192,192,0.4)",
borderColor: "rgba(75,192,192,1)",
borderCapStyle: 'butt',
borderDash: [],
borderDashOffset: 0.0,
borderJoinStyle: 'miter',
pointBorderColor: "rgba(75,192,192,1)",
pointBackgroundColor: "#fff",
pointBorderWidth: 1,
pointHoverRadius: 5,
pointHoverBackgroundColor: "rgba(75,192,192,1)",
pointHoverBorderColor: "rgba(220,220,220,1)",
pointHoverBorderWidth: 2,
pointRadius: 1,
pointHitRadius: 10,
data: [],
spanGaps: false,
},
]
};
var ctx = document.getElementById("lines-graph").getContext("2d");
var LineChart = new Chart(ctx, {
type: 'line',
data: lineChartData,
responsive: true,
bezierCurve : false
});
var barChartData = {
labels: [],
datasets: [
{
label: "cpu占用率",
backgroundColor: [
'rgba(255, 99, 132, 0.2)',
'rgba(54, 162, 235, 0.2)',
'rgba(255, 206, 86, 0.2)',
'rgba(75, 192, 192, 0.2)',
'rgba(153, 102, 255, 0.2)',
'rgba(255, 159, 64, 0.2)'
],
data: [],
}
]
};
var ctx = document.getElementById("bars-graph").getContext("2d");
var BarChart = new Chart(ctx, {
type: 'bar',
data: barChartData,
responsive : true
});
var lineRAMChartData = {
labels: [],
datasets: [
{
label: "内存占用曲线",
fill: false,
lineTension: 0.1,
backgroundColor: "rgba(75,192,192,0.4)",
borderColor: "rgba(75,192,192,1)",
borderCapStyle: 'butt',
borderDash: [],
borderDashOffset: 0.0,
borderJoinStyle: 'miter',
pointBorderColor: "rgba(75,192,192,1)",
pointBackgroundColor: "#fff",
pointBorderWidth: 1,
pointHoverRadius: 5,
pointHoverBackgroundColor: "rgba(75,192,192,1)",
pointHoverBorderColor: "rgba(220,220,220,1)",
pointHoverBorderWidth: 2,
pointRadius: 1,
pointHitRadius: 10,
data: [],
spanGaps: false,
},
]
};
var ctx = document.getElementById("lines-ram").getContext("2d");
var LineRAMChart = new Chart(ctx, {
type: 'line',
data: lineRAMChartData,
responsive: true,
bezierCurve : false
});
function sendMsg() {
var msg = $("#msg").val();
ws.send(msg);
$("#msg").val("");
}
</script>
</body>
</html>
实现效果:
【小结】
通过系统监控,发现系统CPU运行稳定,占用率非常小。