设备使用python连接阿里Iot
物联网(IoT)设备连接家庭设备,实现智能化管理 #生活常识# #科技应用#
简单的说一下mqtt原理
发布者(Publisher)、代理(Broker,服务器)、订阅者(Subscriber)。发布者和订阅者都是客户端
阿里iot学习路线: https://help.aliyun.com/learn/learningpath/iot.html?spm=5176.229363.1219927.btn3.7d431449twQUhs
个人觉得 阿里iot平台 消息传输可以分为: 设备端, 服务订阅端, 云端api 这样更恰当
阿里的iot平台是收费的,连接时长费,消费通讯费,(马爸爸说:不赚钱的企业是不道德的),
所以我想,最简单的方法使用mqtt+http,服务订阅端不要了,而不是全部使用mqtt,mqtt的作用就是当年的BB机.
服务器通过iot平台通知设备端:服务器呼叫你,请回电!然后设备端发起post/get请求,然后两边就可以通讯了!
只是一种想法,毕竟http协议是开放的,安全性不是那么好,还有性能开销等问题,唯一的好处是不要钱!
以下代码环境:
python 3.7
win10
centos7_64
设备端:(python3.7 + win10)
设备端连接阿里的iot平台有两种方式:
1,使用开源的mqtt协议连接阿里云iot
参考: https://blog.csdn.net/weixin_34233421/article/details/90418854
https://blog.csdn.net/S_L_zheng/article/details/81736107
https://yq.aliyun.com/articles/721378 (这个只能做辅助)
代码见文章尾部
2,使用阿里的linkkit SDK
参考: https://blog.csdn.net/qq_19408097/article/details/100862733
这是两种连接阿里iot平台的方法都是可以的
连接上之后都可以使用控制台上的自定义topic消息发送消息测试,但是设备订阅端是收不到自己发的消息的
代码见文章尾部
服务端订阅: (python3.7 + centos7_64+ AMQP服务端订阅 )
就是放在服务器上接收设备发送的消息的
本来想在win10上进行测试,尝试多次,不成功!在centos上 一次成功:
参考: https://blog.csdn.net/hamanwendy/article/details/104450338
https://blog.csdn.net/weixin_41869763/article/details/103910017
https://help.aliyun.com/document_detail/143597.html?spm=a2c4g.11186623.6.620.1da47fe4K7zCZu
我是这么干的:
wget http://archive.apache.org/dist/qpid/proton/0.29.0/qpid-proton-0.29.0.tar.gz
tar zxvf qpid-proton-0.29.0.tar.gz
yum install gcc gcc-c++ make cmake libuuid-devel
yum install openssl-devel
yum install cyrus-sasl-devel cyrus-sasl-plain cyrus-sasl-md5
yum install swig
yum install python-devel
yum install python-sphinx
cd qpid-proton-0.29.0/
mkdir build
cd build
cmake … -DCMAKE_INSTALL_PREFIX=/usr -DSYSINSTALL_BINDINGS=ON
sudo make install
pip install python-qpid-proton
测试:
python
import proton
print(’%s’ % ‘SSL present’ if proton.SSL.present() else ‘SSL NOT AVAILABLE’)
显示: SSL present
代码见文章尾部
云端调用api发消息给设备:
参考:
https://blog.csdn.net/weixin_43368807/article/details/83026434’
https://help.aliyun.com/document_detail/30579.html?spm=5176.11842913.1177534.34.178e6e69WFKrUc
https://help.aliyun.com/document_detail/42700.html?spm=a2c4g.11186623.2.17.683c3d66Klq7j2#reference-qdh-ywb-zdb
代码见文章尾部(能通,但没解析)
以下均为代码:
开源mqtt设备端:
# encoding: utf-8 import hmac import hashlib import paho.mqtt.client as mqtt ProductKey = '********' ClientId = "12345" # 自定义clientId DeviceName = "led1" DeviceSecret = "*****************************************" signmethod = 'hmacsha1' timestamp = '119345' mqttHostUrl = ProductKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com" mqttHostPort = 1883 mqttClientId = ClientId + '|securemode=3,signmethod=' + signmethod + ',timestamp=' + timestamp + '|' mqttUsername = DeviceName + '&' + ProductKey def get_mqtt_password(): data = "".join(("clientId", ClientId, "deviceName", DeviceName, "productKey", ProductKey, "timestamp", timestamp )) print(data) if "hmacsha1" == signmethod: ret = hmac.new(bytes(DeviceSecret, encoding="utf-8"), bytes(data, encoding="utf-8"), hashlib.sha1).hexdigest() return ret elif "hmacmd5" == signmethod: ret = hmac.new(bytes(DeviceSecret, encoding="utf-8"), bytes(data, encoding="utf-8")) return ret mqttPassword = get_mqtt_password() mqttc = mqtt.Client(mqttClientId) # 连接 def on_mqtt_connect(): print("mqttClientId:", mqttClientId) print("mqttUsername:", mqttUsername) print("mqttPassword:", mqttPassword) print("mqttHostUrl:", mqttHostUrl) mqttc.username_pw_set(mqttUsername, mqttPassword) mqttc.connect(mqttHostUrl, mqttHostPort, 60) # publish (发布)消息 def on_publish(topic, payload, qos): mqttc.publish(topic, payload, qos) # subscribe 订阅消息 def on_subscribe(): mqttc.subscribe(ProductKey+"/"+DeviceName+"/user/update", 1) mqttc.on_message = on_message_come # 消息到来处理函数 # 消息处理函数 def on_message_come(lient, userdata, msg): print('消息接收', msg.topic + " " + ":" + str(msg.payload)) if __name__ == '__main__': on_mqtt_connect() on_subscribe() mqttc.loop_forever()
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869linkkit设备端:
from linkkit import linkkit import time # 连接阿里云 def on_connect(session_flag, rc, userdata): print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc)) pass def on_subscribe_topic(mid, granted_qos, userdata): # 订阅topic print("on_subscribe_topic mid:%d, granted_qos:%s" % (mid, str(','.join('%s' % it for it in granted_qos)))) pass # 接收云端的数据 def on_topic_message(topic, payload, qos, userdata): print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos)) print("阿里云上传回的数值是:", str(payload)[2:-1]) def on_unsubscribe_topic(mid, userdata): print("on_unsubscribe_topic mid:%d" % mid) pass # 设置连接参数,方法为“一机一密”型 lk = linkkit.LinkKit( host_name="cn-shanghai", product_key="***************************************", device_name="led1", device_secret="****************************") if __name__ == '__main__': # 注册接收到云端数据的方法 lk.on_connect = on_connect # 注册云端订阅的方法 lk.on_subscribe_topic = on_subscribe_topic # 注册当接受到云端发送的数据的时候的方法 lk.on_topic_message = on_topic_message # 注册取消云端订阅的方法 lk.on_unsubscribe_topic = on_unsubscribe_topic # 连接阿里云的函数(异步调用) lk.connect_async() # 因为他是他是异步调用需要时间所以如果没有这个延时函数的话,他就会出现not in connected state的错误 time.sleep(2) # 订阅这个topic,不需要写prodect_key和device_name rc, mid = lk.subscribe_topic(lk.to_full_topic("user/get")) if rc == 0: print("subscribe multiple topics success:%r, mid:%r" % (rc, mid)) else: print("subscribe multiple topics fail:%d" % rc) pass while True: a = input("你想要将什么数值传到阿里云上去?") # 调用数据上传的函数,将string类的a上传到阿里云上去 rc, mid = lk.publish_topic(lk.to_full_topic("user/update"), str(a)) if rc == 0: print("publish topic success:%r, mid:%r" % (rc, mid)) else: print("publish topic fail:%d" % rc)
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859AMQP服务端
# # encoding=utf-8 import sys import logging import time from proton.handlers import MessagingHandler from proton.reactor import Container import hashlib import hmac import base64 reload(sys) sys.setdefaultencoding('utf-8') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) console_handler = logging.StreamHandler(sys.stdout) uid = '*******************************' accessKey = '********************************' accessSecret = *******************************************' regionId = 'cn-shanghai' consumerGroupId = 'DEFAULT_GROUP' iotInstanceId = "" clientId = "12345" def current_time_millis(): return str(int(round(time.time() * 1000))) def do_sign(secret, sign_content): m = hmac.new(secret, sign_content, digestmod=hashlib.sha1) return base64.b64encode(m.digest()) class AmqpClient(MessagingHandler): def __init__(self): super(AmqpClient, self).__init__() def on_start(self, event): url = "amqps://" + uid + ".iot-amqp." + regionId + ".aliyuncs.com" signMethod = "hmacsha1" timestamp = current_time_millis() # 按照阿里云IoT规范,组装UserName。 userName = clientId + '|authMode=aksign' + ',signMethod=' + signMethod + ',timestamp=' + timestamp + ',authId=' + accessKey + ',iotInstanceId=' + iotInstanceId + ',consumerGroupId=' + consumerGroupId + '|' signContent = "authId=" + accessKey + "×tamp=" + timestamp # 按照阿里云IoT规范,计算签名,组装password。 passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8")) conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60) self.receiver = event.container.create_receiver(conn) # 当连接成功建立被调用。 def on_connection_opened(self, event): logger.info("Connection established, remoteUrl: %s", event.connection.hostname) # 当连接关闭时被调用。 def on_connection_closed(self, event): logger.info("Connection closed: %s", self) # 当远端因错误而关闭连接时被调用。 def on_connection_error(self, event): logger.info("Connection error") # 当建立AMQP连接错误时被调用,包括身份验证错误和套接字错误。 def on_transport_error(self, event): if event.transport.condition: if event.transport.condition.info: logger.error("%s: %s: %s" % ( event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) else: logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) else: logging.error("Unspecified transport error") # 当收到消息时被调用。 def on_message(self, event): message = event.message content = message.body.decode('utf-8') topic = message.properties.get("topic") message_id = message.properties.get("messageId") print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content)) event.receiver.flow(1) Container(AmqpClient()).run()
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485云端Api调用 给指定设备发送消息:
#!/usr/bin/env python # coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest uid = '**********************' accessKey = '******************************' accessSecret = '******************************' regionId = 'cn-shanghai' consumerGroupId = 'DEFAULT_GROUP' iotInstanceId = "" clientId = "12345" DeviceName = "led1" ProductKey = 'a1r2MPi95bL' sendData = '1234' client = AcsClient(accessKey, accessSecret, regionId) request = CommonRequest() request.set_accept_format('json') request.set_domain('iot.cn-shanghai.aliyuncs.com') request.set_method('POST') request.set_protocol_type('https') # https | http request.set_version('2018-01-20') request.set_action_name('RRpc') request.add_query_param('RegionId', regionId) request.add_query_param('DeviceName', DeviceName) request.add_query_param('Timeout', "3000") request.add_query_param('RequestBase64Byte', sendData) request.add_query_param('ProductKey', ProductKey) if __name__ == '__main__': response = client.do_action_with_exception(request) # python2: print(response) print(str(response, encoding='utf-8'))
12345678910111213141516171819202122232425262728293031323334353637相对比市场上众多的云产品,其实阿里的相对来说比较贵的,
我不是专门做物联网的,只是觉得以后可能会用到这些玩意,所以尝试了一些,
以后更多的,可能会在python 和树莓派上吧! 树莓派应该可以发挥出很牛逼的功能
网址:设备使用python连接阿里Iot https://www.yuejiaxmz.com/news/view/122779
相关内容
WIFI设备接入阿里云物联网平台【阿里云生活物联网架构师专题 ③】esp32 sdk 直连接入天猫精灵IOT开放平台,实现天猫精灵找队友零配网功能和语音控制;
如何使用蓝牙设备端SDK用户编程接口
阿里云物联网平台学习(二)之场景联动
【Python】Python连接Hadoop数据中遇到的各种坑(汇总)
设备接入阿里物联网云步骤
终端设备通过MQTT协议上传到阿里云或华为云上,云上如何创建设备及传输数据
智能健康监测:如何利用 IoT 和 AI 提高生活质量
【阿里云生活物联网架构师专题 ⑥】ESP8266接入阿里生活飞燕平台国际版,实现亚马逊Alexa Echo音响语音控制;
基于阿里云物联网平台