python 串口图形化
python使用MQTT给硬件传输图片的实现方法最近因需要用python写一个微服务来用mqtt给硬件传输图片,其中python用的是flask框架,大概流程如下:
协议为:
需要将图片数据封装成多个消息进行传输,每个消息传输的数据字节数为1400byte。
消息(mqtt payload) 格式:web服务器-------->base:
反馈:base---------> web服务器:
如果web服务器发送完一个“数据传输消息”后,5s内没有收到mqtt“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。
程序流程图
根据上面的协议,可以得到如下的流程图:
代码如下:
|
# encoding:utf-8 from flask import flask, jsonify from flask_restful import api, resource, reqparse from pil import image from io import bytesio import requests import os, logging, time import paho.mqtt.client as mqtt import struct from flask_cors import * # 日志配置信息 logging.basicconfig( level = logging.info, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcname)s' , ) class mqtt( object ): def __init__( self , img_data, size): self .mqtthost = '*******' self .mqttport = "******" # 订阅和发送的主题 self .topic_from_base = 'mqtttestsub' self .topic_to_base = 'mqtttestpub' self .client_id = time.strftime( '%y%m%d%h%m%s' , time.localtime(time.time())) self .client = mqtt.client( self .client_id) # 完成链接后的回掉函数 self .client.on_connect = self .on_connect # 图片大小 self .size = size # 用于跳出死循环,结束任务 self .finished = none # 包的编号 self .index = 0 # 将收到的图片数据按大小分成列表 self .image_data_list = [img_data[x:x + 1400 ] for x in range ( 0 , self .size, 1400 )] # 记录发布后的数据,用于监控时延 self .pub_time = 0 self .header_to_base = 0xffffeeee self .header_from_base = 0xeeeeffff # 功能标识 self .function_begin = 0x01 self .function_doing = 0x02 self .function_finished = 0x03 # 包的完整和非完整状态 self .whole_package = 0x01 self .bad_package = 0x00 # 头信息的格式,小端模式 self .format_to_base = "<lbhh" self .format_from_base = "<lbhb" # 如果重发包时,用于检查是否重发第一个包 self .first = true # 如果重发包时,用于检查是否重发最后一个包 self .last = false self .begin_data = 'image.jpg" alt="python 串口图形化(python使用MQTT给硬件传输图片的实现方法)" border="0" /> # 链接mqtt服务器函数 def on_mqtt_connect( self ): self .client.connect( self .mqtthost, self .mqttport, 60 ) self .client.loop_start() # 链接完成后的回调函数 def on_connect( self , client, userdata, flags, rc): logging.info( "+++ connected with result code {} +++" . format ( str (rc))) self .client.subscribe( self .topic_from_base) # 订阅函数 def subscribe( self ): self .client.subscribe( self .topic_from_base, 1 ) # 消息到来处理函数 self .client.on_message = self .on_message # 接收到信息后的回调函数 def on_message( self , client, userdata, msg): # 如果接受第一个包则不需要重发第一个 self .first = false # 将接受到的包进行解压,得到一个元组 base_tuple = struct.unpack( self .format_from_base, msg.payload) logging.info( "+++ imagedata's letgth is {}, base_tupe is {} +++" . format ( self .size, base_tuple)) logging.info( "+++ package_number is {}, package_status_from_base is {} +++" . format (base_tuple[ 2 ], base_tuple[ 3 ])) # 检查接受到信息的头部是否正确 if base_tuple[ 0 ] = = self .header_from_base: logging.info( "+++ function_from_base is {} +++" . format (base_tuple[ 1 ])) # 是否完成传输,如果完成则退出 if base_tuple[ 1 ] = = self .function_finished: logging.info( "+++ finish work +++" ) self .finished = 1 self .client.disconnect() else : # 是否是最后一个包 if self .index = = len ( self .image_data_list) - 1 : self .publish( 'finished' , self .function_finished) self .last = true logging.info( "+++ finished_data_to_base is finished+++" ) else : # 如果接收到的包不是 0x03则进行传送数据 if base_tuple[ 1 ] = = self .function_begin or base_tuple[ 1 ] = = self .function_doing: logging.info( "+++ package_number is {}, package_status_from_base is {} +++" . format (base_tuple[ 2 ],base_tuple[ 3 ])) # 如果数据的反馈中,包的状态是1则继续发下一个包 if base_tuple[ 3 ] = = self .whole_package: self .publish( self .index, self .function_doing) logging.info( "+++ data_to_base is finished+++" ) self .index + = 1 # 如果数据的反馈中,包的状态是0则重发数据包 elif base_tuple[ 3 ] = = self .bad_package: re_package_number = base_tuple[ 2 ] self .publish(re_package_number - 1 , self .function_doing) logging.info( "+++ re_data_to_base is finished+++" ) else : logging.info( "+++ package_status_from_base is not 0 or 1 +++" ) self .client.disconnect() else : logging.info( "+++ function_identifier is illegal +++" ) self .client.disconnect() else : logging.info( "+++ header_from_base is illegal +++" ) self .client.disconnect() # 数据发送函数 def publish( self , index, fuc): # 看是否是最后一个包 if index = = 'finished' : length = 0 package_number = 0 data = b'' else : length = len ( self .image_data_list[index]) package_number = index data = self .image_data_list[index] # 打包数据头信息 buffer = struct.pack( self .format_to_base, self .header_to_base, fuc, package_number, length ) to_base_data = buffer + data # mqtt发送 self .client.publish( self .topic_to_base, to_base_data ) self .pub_time = time.time() # 发送第一个包函数 def publish_begin( self ): buffer = struct.pack( self .format_to_base, self .header_to_base, self .function_begin, 0 , len ( self .begin_data.encode( 'utf-8' )), ) begin_data = buffer + self .begin_data.encode( 'utf-8' ) self .client.publish( self .topic_to_base, begin_data) # 控制函数 def control( self ): self .on_mqtt_connect() self .publish_begin() begin_time = time.time() self .pub_time = time.time() self .subscribe() while true: time.sleep( 1 ) # 超过5秒重传 date = time.time() - self .pub_time if date > 5 : # 是否重传第一个包 if self .first = = true: self .publish_begin() logging.info( '+++ this is timeout first_data +++' ) # 是否重传最后一个包 elif self .last = = true: self .publish( 'finished' , self .function_finished) logging.info( '+++ this is timeout last_data +++' ) else : &nb
|