主机监测平台(二) ---数据持久化和websocket
主机监测平台(二) ---数据持久化和websocket
创新实践项目的进阶设计
Socket服务端编写
一开始是选Websocket的,但是中途遇到一点问题(端口、心跳),然后转战Socket
。。。结果还是换回Websocket了
-
订阅redis
# 创建一个 Redis 客户端连接 redis_client = redis.StrictRedis(host='106.54.213.167', port=6380, db=0) # 订阅频道 pubsub = redis_client.pubsub() pubsub.subscribe('monitor_data')
-
多线程连接
其实一开始的多线程连接应该也是可以的,但是由于我把订阅放在了handle_client 外部
导致所有客户端连接会共享同一个
pubsub
对象当有消息到达时,多个连接都在等待接收消息,导致消息堆积,从而产生粘包问题。
# 建立连接 client_socket, addr = server_socket.accept() # 使用多线程处理连接 client_thread = threading.Thread(target=handle_client, args=(client_socket, addr)) client_thread.start()
-
设置分隔符( ‘\n')
由于线程的数据量比较大,导致偶尔会出现粘包问题
for message in pubsub.listen(): if message['type'] == 'message': data = message['data'] message_separator = b'\n' try: client_socket.send(data+message_separator)
tips: 由于 redis 订阅的数据本身就是字节流,所以加分隔符 b'\n'
-
(客户端测试)
注意编写缓存区,判别分隔符
message_separator = '\n' buffer = '' while True: response = client_socket.recv(1024) if not response: break try: response_str = response.decode('utf-8') buffer += response_str while message_separator in buffer: message, buffer = buffer.split(message_separator, 1) data_dict = json.loads(message) # 处理接收到的字典消息 print(data_dict)
Websocket服务端编写
HamonyOs 好像SDK暂不支持Socket的跨平台连接(maybe
再回来编写websocket
-
订阅redis(如Socket)
-
启动websocket服务端
# 启动 WebSocket 服务器 start_server = websockets.serve(handle_client, '0.0.0.0', 6666)
-
连接业务逻辑
内部的logging库,可以用来调试,方便找出bug
async def handle_client(websocket, path): # 添加客户端到连接集合 connected_clients.add(websocket) try: # 创建一个 Redis 客户端连接 redis_client = redis.StrictRedis(host='106.54.213.167', port=6380, db=0) # 订阅频道 pubsub = redis_client.pubsub() pubsub.subscribe('monitor_data') for message in pubsub.listen(): if message['type'] == 'message': data = message['data'] message_separator = b'\n' # 打印订阅的 Redis 数据 # logging.info(f"Received data from Redis: {data}") # 将数据发送给所有已连接的客户端 for client in connected_clients: try: # 发送数据 await client.send(data+message_separator) # 响应Ping帧 await client.ping() # 响应Pong帧 await client.pong() except websockets.exceptions.ConnectionClosedError as e: logging.error(f"Connection error: {e}") # 从连接集合中移除断开的客户端 connected_clients.remove(client) except websockets.exceptions.ConnectionClosedError as e: logging.error(f"Connection error: {e}") finally: # 从连接集合中移除客户端 connected_clients.remove(websocket)
-
分包问题
其实,如果客户端也用python的 websockets库,是不会出现粘包问题的,因为每个数据帧都有标识头
但由于HamonyOs本身对数据长度有限制,导致出现了分包问题,解决方案如Socket,添加分隔符
tips: docker-compose 部署暂时有些小问题(好像是 服务部署不能用 0.0.0.0 得用 ‘微服务名称’,先用docker部署
SQLite数据库设计
SQLite轻量化数据库便于存储小型数据集,由于业务需要部分数据进行持久化存储,所以采用SQLite存储部分数据
- 设计存储结构
# 用户表
conn.execute('''
CREATE TABLE IF NOT EXISTS user_data (
user_id INTEGER PRIMARY KEY,
ip TEXT
)
''')
# 创建监测数据表
conn.execute('''
CREATE TABLE IF NOT EXISTS monitoring_data (
ip TEXT,
id INTEGER PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
processes_num INTEGER,
memory_percent DOUBLE,
network_sent INTEGER,
network_recv INTEGER,
network_loss DOUBLE,
cpu_usage DOUBLE
)
''')
# 创建进程数据表
conn.execute('''
CREATE TABLE IF NOT EXISTS process_data (
id INTEGER PRIMARY KEY,
data TEXT,
monitoring_data_id INTEGER,
FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
''')
仔细想了一下,感觉用户表不能放在机子上,应该放在后端的数据库上,然后机子本身的SQLite就存好持久化数据
-
订阅redis(如上)
-
存储
我试图将进程数据与其他数据分开存储,但由于进程数据实在太大太详细,导致我数据存储时,增长率为24KB/S
估算了一下,1GB够我存12h的数据,这个存储效率显然是不够的( 后期优化结构吧 )
-
获取当前设备的IP(可能会拿到内网的
(这里的IP其实可以不用存储(让用户自己输入设备IP,我们订阅就行
-
订阅该设备的redis数据库
-
存入SQLite
cursor = conn.cursor() cursor.execute('''insert into monitoring_data (ip, processes_num, memory_percent, network_sent, network_recv, network_loss, cpu_usage) VALUES (?,?,?,?,?,?,?)''', (public_ip, len(process_data), memory_data, sent_data, recv_data, loss_data, cpu_data)) valid_processes = [] # 获取刚刚插入的数据的ID monitoring_data_id = cursor.lastrowid for item in process_data : pro_mem = item['memory_percent'] pro_cpu = item['cpu_percent'] if pro_cpu > 0 or pro_mem > 0 : # print(f"cpu {pro_cpu} mem {pro_mem}") # valid_processes.append(item) # 插入有效进程数据 cursor.execute("INSERT INTO process_data (data, monitoring_data_id) VALUES (?, ?)", (json.dumps(item), monitoring_data_id))
-
-
定期清理
这里一开始我是打算使用 datetime库,然后获取当前时间进行比较
结果发现SQLite本身有datetime函数,直接进行比较
# 8h为时区差,-12则记录4h数据 cursor.execute("SELECT id FROM monitoring_data WHERE timestamp < datetime('now', 'localtime', '-12 hour')") data_to_delete = cursor.fetchall() # 删除process_data表中与一天前监测数据ID不匹配的记录 for row in data_to_delete: cursor.execute("DELETE FROM process_data WHERE monitoring_data_id = ?", (row[0],)) conn.commit() cursor.execute("delete from monitoring_data WHERE timestamp < datetime('now', 'localtime', '-15 day')") conn.commit()
总体业务逻辑
- 某用户在设备上部署微服务(监测、redis、socket、ws、SQLite) (kafka暂时不考虑
- 部署成功后,在页面中添加机器ip
- 数据展示
- 实时数据 : 调redis(后续可以先调SQLiIte,取出所有设备IP,然后订阅所有设备数据,存入kafka,然后读取消息队列
- 历史数据 : 调SQLite
- (预警
项目仓库
README.md含部署教程
参考资料
消息队列Kafka、RocketMQ、RabbitMQ的优劣势比较
每日小知识:
消息队列:
常见的有:Kafka、RabbitMQ
本质作用:解耦,分离上下游业务,类似前后端分离的理念。
上游只需要处理好业务将消息(控制消息或者数据)--- 传入消息队列(完成工作流程)---下游收到消息后进行相应操作
优点:
- 解耦使应用本身上下游可以各自扩展,而不影响对方工作
- 本身具有异步性,所以可以实现流量削峰,缓冲大量瞬时流量
tips:
早点做准备,努力赚钱