主机监测平台(二) ---数据持久化和websocket

创新实践项目的进阶设计

Socket服务端编写

一开始是选Websocket的,但是中途遇到一点问题(端口、心跳),然后转战Socket

。。。结果还是换回Websocket了

  1. 订阅redis

    # 创建一个 Redis 客户端连接
    redis_client = redis.StrictRedis(host='106.54.213.167', port=6380, db=0)
    # 订阅频道
    pubsub = redis_client.pubsub()
    pubsub.subscribe('monitor_data')
    
  2. 多线程连接

    其实一开始的多线程连接应该也是可以的,但是由于我把订阅放在了handle_client 外部

    导致所有客户端连接会共享同一个 pubsub 对象

    当有消息到达时,多个连接都在等待接收消息,导致消息堆积,从而产生粘包问题。

     # 建立连接
    client_socket, addr = server_socket.accept()
    # 使用多线程处理连接
    client_thread = threading.Thread(target=handle_client, args=(client_socket, addr))
    client_thread.start()
    
  3. 设置分隔符( ‘\n')

    由于线程的数据量比较大,导致偶尔会出现粘包问题

    for message in pubsub.listen():
        if message['type'] == 'message':
            data = message['data']
            message_separator = b'\n'
            try:
                client_socket.send(data+message_separator)
    

    tips: 由于 redis 订阅的数据本身就是字节流,所以加分隔符 b'\n'

  4. (客户端测试)

    注意编写缓存区,判别分隔符

    message_separator = '\n'
    buffer = ''
        while True:
            response = client_socket.recv(1024)
            if not response:
                break
            try:
                response_str = response.decode('utf-8')
                buffer += response_str
                while message_separator in buffer:
                    message, buffer = buffer.split(message_separator, 1)
                    data_dict = json.loads(message)
                    # 处理接收到的字典消息
                    print(data_dict)
    

Websocket服务端编写

HamonyOs 好像SDK暂不支持Socket的跨平台连接(maybe

再回来编写websocket

  1. 订阅redis(如Socket)

  2. 启动websocket服务端

    # 启动 WebSocket 服务器
    start_server = websockets.serve(handle_client, '0.0.0.0', 6666)
    
  3. 连接业务逻辑

    内部的logging库,可以用来调试,方便找出bug

    async def handle_client(websocket, path):
        # 添加客户端到连接集合
        connected_clients.add(websocket)
        try:
            # 创建一个 Redis 客户端连接
            redis_client = redis.StrictRedis(host='106.54.213.167', port=6380, db=0)
    
            # 订阅频道
            pubsub = redis_client.pubsub()
            pubsub.subscribe('monitor_data')
    
            for message in pubsub.listen():
                if message['type'] == 'message':
                    data = message['data']
                    message_separator = b'\n'
                    # 打印订阅的 Redis 数据
                    #  logging.info(f"Received data from Redis: {data}")
                    # 将数据发送给所有已连接的客户端
                    for client in connected_clients:
                        try:
                            # 发送数据
                            await client.send(data+message_separator)
                            # 响应Ping帧
                            await client.ping()
                            # 响应Pong帧
                            await client.pong()
                        except websockets.exceptions.ConnectionClosedError as e:
    
                            logging.error(f"Connection error: {e}")
                            # 从连接集合中移除断开的客户端
                            connected_clients.remove(client)
        except websockets.exceptions.ConnectionClosedError as e:
            logging.error(f"Connection error: {e}")
        finally:
            # 从连接集合中移除客户端
            connected_clients.remove(websocket)
    
  4. 分包问题

    其实,如果客户端也用python的 websockets库,是不会出现粘包问题的,因为每个数据帧都有标识头

    但由于HamonyOs本身对数据长度有限制,导致出现了分包问题,解决方案如Socket,添加分隔符

tips: docker-compose 部署暂时有些小问题(好像是 服务部署不能用 0.0.0.0 得用 ‘微服务名称’,先用docker部署


SQLite数据库设计

SQLite轻量化数据库便于存储小型数据集,由于业务需要部分数据进行持久化存储,所以采用SQLite存储部分数据

  1. 设计存储结构

image-20231101101210023.png

# 用户表
conn.execute('''
CREATE TABLE IF NOT EXISTS user_data (
    user_id INTEGER PRIMARY KEY,
    ip TEXT
)
''')
# 创建监测数据表
conn.execute('''
CREATE TABLE IF NOT EXISTS monitoring_data (
    ip TEXT,
    id INTEGER PRIMARY KEY,
    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
    processes_num INTEGER,
    memory_percent DOUBLE,
    network_sent INTEGER,
    network_recv INTEGER,
    network_loss DOUBLE,
    cpu_usage DOUBLE
)
''')

# 创建进程数据表
conn.execute('''
CREATE TABLE IF NOT EXISTS process_data (
    id INTEGER PRIMARY KEY,
    data TEXT,
    monitoring_data_id INTEGER,
    FOREIGN KEY (monitoring_data_id) REFERENCES monitoring_data (id)
)
''')

仔细想了一下,感觉用户表不能放在机子上,应该放在后端的数据库上,然后机子本身的SQLite就存好持久化数据

  1. 订阅redis(如上)

  2. 存储

    我试图将进程数据与其他数据分开存储,但由于进程数据实在太大太详细,导致我数据存储时,增长率为24KB/S

    估算了一下,1GB够我存12h的数据,这个存储效率显然是不够的( 后期优化结构吧 )

    • 获取当前设备的IP(可能会拿到内网的

      (这里的IP其实可以不用存储(让用户自己输入设备IP,我们订阅就行

    • 订阅该设备的redis数据库

    • 存入SQLite

    cursor = conn.cursor()
                cursor.execute('''insert into monitoring_data
                            (ip, processes_num, memory_percent, network_sent, network_recv, network_loss, cpu_usage)
                            VALUES (?,?,?,?,?,?,?)''',
                               (public_ip, len(process_data), memory_data, sent_data, recv_data, loss_data, cpu_data))
                valid_processes = []
                # 获取刚刚插入的数据的ID
                monitoring_data_id = cursor.lastrowid
    
                for item in process_data :
                    pro_mem = item['memory_percent']
                    pro_cpu = item['cpu_percent']
                    if pro_cpu > 0 or pro_mem > 0 :
                        # print(f"cpu {pro_cpu} mem {pro_mem}")
                        # valid_processes.append(item)
                        # 插入有效进程数据
                        cursor.execute("INSERT INTO process_data (data, monitoring_data_id) VALUES (?, ?)",
                                       (json.dumps(item), monitoring_data_id))
    
    
  3. 定期清理

    这里一开始我是打算使用 datetime库,然后获取当前时间进行比较

    结果发现SQLite本身有datetime函数,直接进行比较

     # 8h为时区差,-12则记录4h数据
                cursor.execute("SELECT id FROM monitoring_data WHERE timestamp < datetime('now', 'localtime', '-12 hour')")
                data_to_delete = cursor.fetchall()
                # 删除process_data表中与一天前监测数据ID不匹配的记录
                for row in data_to_delete:
                    cursor.execute("DELETE FROM process_data WHERE monitoring_data_id = ?", (row[0],))
                    conn.commit()
                cursor.execute("delete from monitoring_data WHERE timestamp < datetime('now', 'localtime', '-15 day')")
                conn.commit()
    

总体业务逻辑

  1. 某用户在设备上部署微服务(监测、redis、socket、ws、SQLite) (kafka暂时不考虑
  2. 部署成功后,在页面中添加机器ip
  3. 数据展示
    • 实时数据 : 调redis(后续可以先调SQLiIte,取出所有设备IP,然后订阅所有设备数据,存入kafka,然后读取消息队列
    • 历史数据 : 调SQLite
  4. (预警

项目仓库

gitee仓库

README.md含部署教程


参考资料

Python常用库之psutil使用指南

消息队列Kafka、RocketMQ、RabbitMQ的优劣势比较

什么是流量削峰?如何解决秒杀业务的削峰场景


每日小知识:

消息队列:

常见的有:Kafka、RabbitMQ

本质作用:解耦,分离上下游业务,类似前后端分离的理念。

上游只需要处理好业务将消息(控制消息或者数据)--- 传入消息队列(完成工作流程)---下游收到消息后进行相应操作

优点:

  • 解耦使应用本身上下游可以各自扩展,而不影响对方工作
  • 本身具有异步性,所以可以实现流量削峰,缓冲大量瞬时流量

tips:

早点做准备,努力赚钱

文章作者: P4ul
本文链接:
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 打工人驿站
大数据 数据库 Docker python gitee SQLite
喜欢就支持一下吧
打赏
微信 微信
支付宝 支付宝