第一句子网 - 唯美句子、句子迷、好句子大全
第一句子网 > 阿里云物联网平台最完全的使用教程

阿里云物联网平台最完全的使用教程

时间:2020-07-19 06:58:50

相关推荐

阿里云物联网平台最完全的使用教程

简介:包括内容如下(详细到每一个细节和步骤,如果还不清楚,可以进入阿里云控制台创建工单,请教阿里的工程师) 使用环境:(使用蜂窝网进行过测试,和WiFi直连并无差别,可以直接使用)

一、阿里云账号说明

1、基本功能说明

进入阿里云官网创建主账号

/?spm=a2c4g.11186623.amxosvpfn.2.15f5293ewZtPYC

创建完成之后,进入控制台并选择Access Key管理,如图所示

创建子账号,并将物联网平台的所有权限给予子账号,以后我们就用子账号进行各类操作,注意保存得到的三元组,这是接入物联网平台的关键之一

2、开通物联网服务

/product/iot?spm=5176.10695662.J_3717714080.2.1ce83318Gaytdw

选择开通即可,前两个月赠送的免费消息足够用了,选择进入管理控制台

https://iot./lk/summary

指出来的这几个是需要用到的功能

二、物联网平台的基本使用

1、创建产品,如下图

2、添加设备

三、设备接入物联网平台

1、开发环境设置

/document_detail/98292.html?spm=a2c4g.11186623.6.683.7d5b1f19UYzxqv

环境win10,pycharm,python3.8(Ubuntu16 64-bit和Ubuntu18 arm架构同理)

(1)python3.8的安装和pycharm的安装略过(python3.8需要安装pip下载工具)

(2)环境配置

无需参考官方文档配置虚拟环境,直接用pycharm就好了

直接win+R进入win10命令行控制环境

输入:

pip install aliyun-iot-linkkit

2、连接

/document_detail/98293.html?spm=a2c4g.11186623.6.684.61c84912ccMTDp

使用一机一密方式进行

from linkkit import linkkitimport sys#一机一密认证lk = linkkit.LinkKit(host_name="cn-shanghai",product_key="aG*******k",device_name="Test1",device_secret="****************")#连接上物联网平台后的回调,成功连接session_flag和rc返回0def onconnect(sessionflag, rc, userdata):print("onconnect:%d,rc:%d:" % (sessionflag, rc))pass#断开连接物联网平台后的回调,断开后rc返回1def on_disconnect(rc, userdata):print("on_disconnect:rc:%d:" % rc)#当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用lk.onconnect = onconnectlk.ondisconnect = ondisconnectlk.connect_async() # 连接物联网平台lk.startworkerloop() # 保持连接print("connected")

注意

1、lk后面的是你创建产品设备对应的三元组|

2、注意所有的回调函数放在连接之前,程序会一直执行,只要出现相应的操作回调函数就会被调用,即只要连接上时,就输出rc=0,只要断开连接时,就输出rc=1 |

3、自定义MQTT通信

(1)创建自定义的Topic(注意:Topic的权限与代码中的函数要一一对应,例如权限为订阅,那么在通信时选择的应该是subscribe回调,可以接收到云端消息,发布同理)

Topic的名字是作为通信的凭证

(2)实现(首先需要连接上阿里云物联网平台,再构造逻辑进行相应操作,/document_detail/98295.html?spm=a2c4g.11186623.6.685.6d596dc9OWMDE9)

from linkkit import linkkitimport sysimport time#一机一密认证lk = linkkit.LinkKit(host_name="cn-shanghai",product_key="a*************Gk",device_name="Test1",device_secret="****************************")#连接上物联网平台后的回调,成功连接session_flag和rc返回0def onconnect(sessionflag, rc, userdata):print("onconnect:%d,rc:%d:" % (sessionflag, rc))pass#断开连接物联网平台后的回调,断开后rc返回1def on_disconnect(rc, userdata):print("on_disconnect:rc:%d:" % rc)#订阅topic回调def onsubscribetopic(mid, granted_qos, userdata):print("onsubscribetopic mid:%d, granted_qos:%s" %(mid, str(','.join('%s' % it for it in granted_qos))))pass#取消订阅回调def onunsubscribetopic(mid, userdata):print("onunsubscribetopic mid:%d" % mid)pass#接收消息回调,调用函数时会一直执行print,效果为:如果调用该方法,发布消息的topic每次发布消息都会被输出到控制台并打印出来def ontopicmessage(topic, payload, qos, userdata):print("ontopicmessage:" + topic + " payload:" + str(payload) + " qos:" + str(qos))pass#当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用lk.onconnect = onconnectlk.ondisconnect = ondisconnectlk.onsubscribetopic = onsubscribetopic # 订阅topic回调lk.onunsubscribetopic = onunsubscribetopic # 取消订阅topic回调lk.ontopicmessage = ontopicmessage # 接收topic消息回调lk.connect_async() # 连接物联网平台lk.startworkerloop() # 保持连接print("connected")#增加while循环的作用:保证物联网平台是连接上之后再进行通信的while True:try:msg = input() # 获取从控制台的输入except KeyboardInterrupt:sys.exit()else:if msg == "1":lk.disconnect()elif msg == "2":lk.connect_async()elif msg == "3": # 输入为3时,订阅get这个topic,每个订阅的topic只需订阅一次即可,会在物联网平台的设备topic列表中显示rc, mid = lk.subscribetopic(lk.tofull_topic("user/get")) # 注意topic只需要写成这样的格式即可,格式需要完全一致,全称会自动补全,不需要输入设备名if rc == 0: # rc返回值为0时则表示订阅成功print("subscribe topic success:%r, mid:%r" % (rc, mid))else:print("subscribe topic fail:%d" % rc)elif msg == "4": # 取消订阅rc, mid = lk.unsubscribetopic(lk.tofull_topic("user/get"))if rc == 0:print("unsubscribe topic success:%r, mid:%r" % (rc, mid))else:print("unsubscribe topic fail:%d" % rc)elif msg == "5": # 发布消息“123”给自定义的reciver这个topicrc, mid = lk.publishtopic(lk.tofull_topic("user/test"), "123")if rc == 0:print("publish topic success:%r, mid:%r" % (rc, mid))else:print("publish topic fail:%d" % rc)elif msg == "6": # 同时订阅多个topicrc, mid = lk.subscribetopic([(lk.tofull_topic("user/sender"), 1),(lk.tofulltopic("user/get"), 1),(lk.tofulltopic("user/test"), 1)])if rc == 0:print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))else:print("subscribe multiple topics fail:%d" % rc)elif msg == "7": # 同时取消订阅多个topicrc, mid = lk.unsubscribetopic([lk.tofulltopic("user/get"), lk.tofull_topic("user/test")])if rc == 0:print("unsubscribe multiple topics success:%r, mid:%r" % (rc, mid))else:print("unsubscribe multiple topics fail:%d" % rc)elif msg == "8": # RRPClk.ontopicmessage = ontopicmessageelif msg == "11": # 物模型通信,属性上报prop_data = {"Test001": "hh","memory_usage": 99}rc, requestid = lk.thingpostproperty(propdata)if rc == 0:print("propertydata post success:%r, requestid:%r" % (rc, request_id))else:print("property_data post fail:%d" % rc)elif msg == "12": # 物模型通信,事件1上报event_data = {"Testdata001": 100}rc, requestid = lk.thingtriggerevent(("Test001event",event_data))if rc == 0:print("eventdata post success:%r, requestid:%r" % (rc, request_id))else:print("event_data post fail:%d" % rc)elif msg == "13": # 物模型通信,事件2上报event_data = {"Testdata002": 1}rc, requestid = lk.thingtriggerevent(("Test002event",event_data))if rc == 0:print("eventdata post success:%r, requestid:%r" % (rc, request_id))else:print("event_data post fail:%d" % rc)elif msg == "98": # 打印topic列表?ret = lk.dumpusertopics()print("user topics:%s", str(ret))elif msg == "99": # 断开连接lk.destruct()print("destructed")else:sys.exit()

注意点

1、自行查看自定义MQTT通信的代码

2、接收消息回调,当云端发送消息到设备时发生作用,可将接收的数据输出到控制台(回调函数是程序执行过程中一直在执行的代码,只要满足相应的条件就会被运行)

3、注意Topic的格式为:“user/test”(只能是这样的,不需要输入完整的topic名称,sdk会自动补全名称,即如下图部分)

4、设备发送到云端消息查看,如下图:

4、物模型通信

(1)自定义物模型

点编辑草稿进行自定义物模型创建

自定义各项数据

点击生成设备端代码或者物模型TSL查看自定义的物模型的名称等信息(这个是作为物模型通信的凭证)

(2)实现(/document_detail/98370.html?spm=a2c4g.11186623.6.686.7e1352f7jxsvvk)

完整测试代码

from linkkit import linkkitimport sysimport time# 一机一密认证lk = linkkit.LinkKit(host_name="cn-shanghai",product_key="a1******Gk",device_name="Test1",device_secret="e2b*****************afd06abb")# 配置物模型文件lk.thing_setup("tsl.json")# 物模型可用时回调函数# def on_thing_enable(self, userdata):#print("on_thing_enable")def on_thing_enable(userdata):print("on_thing_enable")pass# 物模型不可用时回调函数def on_thing_disable(userdata):print("on_thing_disable")# 属性上报回调# def on_thing_prop_post(self, request_id, code, data, message, userdata):#print("on_thing_prop_post request id:%s, code:%d, data:%s message:%s" % (request_id, code, str(data), message))def on_thing_prop_post(request_id, code, data, message, userdata):print("on_thing_prop_post request id:%s, code:%d, data:%s message:%s" %(request_id, code, str(data), message))# 事件上报回调# def on_thing_event_post(self, event, request_id, code, data, message, userdata):#print("on_thing_event_post request id:%s, code:%d, data:%s message:%s" % (event, code, str(data), message))def on_thing_event_post(event, request_id, code, data, message, userdata):print("on_thing_event_post event:%s,request id:%s, code:%d, data:%s, message:%s" %(event, request_id, code, str(data), message))# RRPC请求回调def on_topic_rrpc_message(id, topic, payload, qos, userdata):print("on_topic_rrpc_message: id:%s, topic:%s, payload:%s" % (id, topic, payload))lk.thing_answer_rrpc(id, payload)# service请求回调def on_thing_call_service(identifier, request_id, params, userdata):print("on_thing_call_service: identifier:%s, request_id:%s, params:%s" % (identifier, request_id, params))lk.thing_answer_service(identifier, request_id, 200, {})# 连接上物联网平台后的回调,成功连接session_flag和rc返回0def on_connect(session_flag, rc, userdata):print("on_connect:%d,rc:%d:" % (session_flag, rc))pass# 断开连接物联网平台后的回调,断开后rc返回1def on_disconnect(rc, userdata):print("on_disconnect:rc:%d:" % rc)# 订阅topic回调def on_subscribe_topic(mid, granted_qos, userdata):print("on_subscribe_topic mid:%d, granted_qos:%s" %(mid, str(','.join('%s' % it for it in granted_qos))))pass# 取消订阅回调def on_unsubscribe_topic(mid, userdata):print("on_unsubscribe_topic mid:%d" % mid)pass# 接收消息回调,调用函数时会一直执行print,效果为:如果调用该方法,发布消息的topic每次发布消息都会被输出到控制台并打印出来def on_topic_message(topic, payload, qos, userdata):print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))pass# 当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用lk.on_connect = on_connectlk.on_disconnect = on_disconnectlk.on_subscribe_topic = on_subscribe_topic # 订阅topic回调lk.on_unsubscribe_topic = on_unsubscribe_topic # 取消订阅topic回调lk.on_topic_message = on_topic_message # 接收topic消息回调lk.on_topic_rrpc_message = on_topic_rrpc_message # 普通,接收RRPC请求回调lk.on_thing_call_service = on_thing_call_service # 物模型,处理同步类型的servicelk.on_thing_enable = on_thing_enable # 物模型功能可用时回调lk.on_thing_disable = on_thing_disable # 物模型功能不可用时回调lk.on_thing_prop_post = on_thing_prop_post # 属性上报成功时回调lk.on_thing_event_post = on_thing_event_post # 事件上报成功时回调lk.connect_async() # 连接物联网平台lk.start_worker_loop() # 保持连接print("connected")# 增加while循环的作用:保证物联网平台是连接上之后再进行通信的while True:try:msg = input() # 获取从控制台的输入except KeyboardInterrupt:sys.exit()else:if msg == "1":lk.disconnect()elif msg == "2":lk.connect_async()elif msg == "3": # 输入为3时,订阅get这个topic,每个订阅的topic只需订阅一次即可,会在物联网平台的设备topic列表中显示rc, mid = lk.subscribe_topic(lk.to_full_topic("user/get")) # 注意topic只需要写成这样的格式即可,格式需要完全一致,全称会自动补全,不需要输入设备名if rc == 0: # rc返回值为0时则表示订阅成功print("subscribe topic success:%r, mid:%r" % (rc, mid))else:print("subscribe topic fail:%d" % rc)elif msg == "4": # 取消订阅rc, mid = lk.unsubscribe_topic(lk.to_full_topic("user/get"))if rc == 0:print("unsubscribe topic success:%r, mid:%r" % (rc, mid))else:print("unsubscribe topic fail:%d" % rc)elif msg == "5": # 发布消息“123”给自定义的test这个topicrc, mid = lk.publish_topic(lk.to_full_topic("user/test"), "123")if rc == 0:print("publish topic success:%r, mid:%r" % (rc, mid))else:print("publish topic fail:%d" % rc)elif msg == "6": # 同时订阅多个topicrc, mid = lk.subscribe_topic([(lk.to_full_topic("user/sender"), 1),(lk.to_full_topic("user/get"), 1),(lk.to_full_topic("user/test"), 1)])if rc == 0:print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))else:print("subscribe multiple topics fail:%d" % rc)elif msg == "7": # 同时取消订阅多个topicrc, mid = lk.unsubscribe_topic([lk.to_full_topic("user/get"), lk.to_full_topic("user/test")])if rc == 0:print("unsubscribe multiple topics success:%r, mid:%r" % (rc, mid))else:print("unsubscribe multiple topics fail:%d" % rc)elif msg == "8": # RRPCrc, mid = lk.subscribe_rrpc_topic("/testA")if rc == 0: # rc返回值为0时则表示订阅成功print("subscribe topic success:%r, mid:%r" % (rc, mid))else:print("subscribe topic fail:%d" % rc)elif msg == "11": # 物模型通信,属性上报prop_data = {"Test001": "hh","memory_usage": 99}rc, request_id = lk.thing_post_property(prop_data)if rc == 0:print("property_data post success:%r, request_id:%r" % (rc, request_id))else:print("property_data post fail:%d" % rc)elif msg == "12": # 物模型通信,事件1上报event_data = {"Testdata001": 100}rc, request_id = lk.thing_trigger_event(("Test001_event",event_data))if rc == 0:print("event_data post success:%r, request_id:%r" % (rc, request_id))else:print("event_data post fail:%d" % rc)elif msg == "13": # 物模型通信,事件2上报event_data = {"Testdata002": 1}rc, request_id = lk.thing_trigger_event(("Test002_event",event_data))if rc == 0:print("event_data post success:%r, request_id:%r" % (rc, request_id))else:print("event_data post fail:%d" % rc)elif msg == "98": # 打印topic列表?ret = lk.dump_user_topics()print("user topics:%s", str(ret))elif msg == "99": # 断开连接lk.destruct()print("destructed")else:sys.exit()

RRPC代码

注意:上面实现的完整代码中rrpc部分和阿里云的文档具有差别,以我这个为准,主要是在回调函数的名称和函数定义是的self上的差别,还要注意订阅rrpc的topic时候的订阅方法和mqtt的订阅方法不一致

阿里云python sdk中的rrpc功能

rrpc在线调用

下面的代码中注意需要加上自定的topic时的格式需要和上面实现代码中的逻辑8中的topic格式一致

!/usr/bin/env pythoncoding=utf-8from aliyunsdkcore.client import AcsClientfrom aliyunsdkcore.request import CommonRequestclient = AcsClient('LTAI************XR3', 'Dh******9fw', 'cn-shanghai')request = CommonRequest()request.setacceptformat('json')request.set_domain('-')request.set_method('POST')request.setprotocoltype('https') # https | httprequest.set_version('-01-20')request.setactionname('RRpc')request.addqueryparam('RegionId', "cn-shanghai")request.addqueryparam('DeviceName', "Test1")request.addqueryparam('Timeout', "5000")request.addqueryparam('RequestBase64Byte', "dG*********Gxl")request.addqueryparam('ProductKey', "a1V************Gk")request.add_query_param('Topic', "/testA")response = client.do_action(request)python2: print(response)print(str(response, encoding = 'utf-8'))

注意点

1、首先注意要配置物模型文件,即lk.thing_setup(“tsl.json”)

2、注意属性、事件和服务的上报方式各不相同

3、查看上报数据,如下图

四、服务端开发

1、AMQP(/document_detail/142489.html?spm=a2c4g.11186623.6.623.3e36354e3xozA7)

python3的sdk补充链接

需要注意的是签名的url一栏填写的是物联网平台实例amqp服务链接中除掉amqp://后面的部分,如下图

(1)环境设置(Qpid Proton 0.29.0直接下载地址:下载网站)

1)Ubuntu18,Python2.7,Qpid Proton 0.29.0(当前测试环境,证实可用)

安装教程

2)其他Linux系统下的python2.7和win10下的C++

安装教程

3)测试proton是否可用

import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')

(2)实现(/document_detail/143597.html?spm=a2c4g.11186623.6.626.3800719ftRcJ40)

# encoding=utf-8import sysimport loggingimport timefrom proton.handlers import MessagingHandlerfrom proton.reactor import Containerimport hashlibimport hmacimport base64reload(sys)sys.setdefaultencoding('utf-8')logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)console_handler = logging.StreamHandler(sys.stdout)def current_time_millis():return str(int(round(time.time() * 1000)))def do_sign(secret, sign_content):m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)return base64.b64encode(m.digest())class AmqpClient(MessagingHandler):def __init__(self):super(AmqpClient, self).__init__()def on_start(self, event):# 接入域名,请参见AMQP客户端接入说明文档。url = "amqps://18************019.iot--:5671"accessKey = "LTA*****************XR3"accessSecret = "Dhc*************Q19fw"consumerGroupId = "xoZ***********0100"# iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。iotInstanceId = ""clientId = "test1"# 签名方法:支持hmacmd5,hmacsha1和hmacsha256。signMethod = "hmacsha1"timestamp = current_time_millis()# userName组装方法,请参见AMQP客户端接入说明文档。userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \+ ",timestamp=" + timestamp + ",authId=" + accessKey \+ ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"signContent = "authId=" + accessKey + "&timestamp=" + timestamp# 计算签名,password组装方法,请参见AMQP客户端接入说明文档。passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)self.receiver = event.container.create_receiver(conn)# 当连接成功建立被调用。def on_connection_opened(self, event):logger.info("Connection established, remoteUrl: %s", event.connection.hostname)# 当连接关闭时被调用。def on_connection_closed(self, event):logger.info("Connection closed: %s", self)# 当远端因错误而关闭连接时被调用。def on_connection_error(self, event):logger.info("Connection error")# 当建立AMQP连接错误时被调用,包括身份验证错误和套接字错误。def on_transport_error(self, event):if event.transport.condition:if event.transport.condition.info:logger.error("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description,event.transport.condition.info))else:logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))else:logging.error("Unspecified transport error")# 当收到消息时被调用。def on_message(self, event):message = event.messagecontent = message.body.decode('utf-8')topic = message.properties.get("topic")message_id = message.properties.get("messageId")print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))event.receiver.flow(1)Container(AmqpClient()).run()

注意

1、文中格式一定需要保持一致,accessKey、accessSecret注意要保证该RAM账号具有物联网平台权限

url:

consumerGroupId:

2、MNS(/document_detail/32305.html?spm=a2c4g.11186623.6.624.20b33dc31he3Dc,/document_detail/68948.html?spm=a2c4g.11186623.6.629.7c8b5608a7Kq8s)

(收费较高不考虑)

下载对应SDK后就在根目录进行调试

若是能够调试出接收消息的代码,请发给我,感谢

#!/usr/bin/env python#coding=utf8# Copyright (C) , Alibaba Cloud Computing#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.import sysimport timefrom mns.account import Accountfrom mns.queue import *from mns.topic import *from mns.subscription import *try:import configparser as ConfigParserexcept ImportError:import ConfigParser as ConfigParsercfgFN = "sample.cfg"required_ops = [("Base", "AccessKeyId"), ("Base", "AccessKeySecret"), ("Base", "Endpoint")]optional_ops = [("Optional", "SecurityToken")]parser = ConfigParser.ConfigParser()parser.read(cfgFN)for sec,op in required_ops:if not parser.has_option(sec, op):sys.stderr.write("ERROR: need (%s, %s) in %s.\n" % (sec,op,cfgFN))sys.stderr.write("Read README to get help inforamtion.\n")sys.exit(1)#获取配置信息## AccessKeyId阿里云官网获取## AccessKeySecret 阿里云官网获取## Endpoint 阿里云消息和通知服务官网获取, Example: http://$-## WARNING: Please do not hard code your accessId and accesskey in next line.(more information: /articles/55947)accessKeyId = parser.get("Base", "AccessKeyId")accessKeySecret = parser.get("Base", "AccessKeySecret")endpoint = parser.get("Base", "Endpoint")securityToken = ""if parser.has_option("Optional", "SecurityToken") and parser.get("Optional", "SecurityToken") != "$SecurityToken":securityToken = parser.get("Optional", "SecurityToken")#初始化my_accountmy_account = Account(endpoint, accessKeyId, accessKeySecret, securityToken)##############Queue 相关操作#####################my_queue = my_account.get_queue("MyQueue-%s" % time.strftime("%y%m%d-%H%M%S", time.localtime()))#创建队列## message被receive后,持续不可消费的时间 100秒## message body的最大长度 10240Byte## message最长存活时间 3600秒## 新message可消费的默认延迟时间 10秒## receive message时,长轮询时间 20秒queue_meta = QueueMeta()queue_meta.set_visibilitytimeout(100)queue_meta.set_maximum_message_size(10240)queue_meta.set_message_retention_period(3600)queue_meta.set_delay_seconds(10)queue_meta.set_polling_wait_seconds(20)queue_meta.set_logging_enabled(True)try:queue_url = my_queue.create(queue_meta)sys.stdout.write("Create Queue Succeed!\nQueueURL:%s\n\n" % queue_url)except MNSExceptionBase as e:sys.stderr.write("Create Queue Fail!\nException:%s\n\n" % e)sys.exit(1)#修改队列属性## message被receive后,持续不可消费的时间 50秒## message body的最大长度 5120Byte## message最长存活时间 1800秒## 新message可消费的默认延迟时间 5秒## receive message时,长轮询时间 10秒queue_meta = QueueMeta()queue_meta.set_visibilitytimeout(50)queue_meta.set_maximum_message_size(5120)queue_meta.set_message_retention_period(1800)queue_meta.set_delay_seconds(5)queue_meta.set_polling_wait_seconds(10)try:queue_url = my_queue.set_attributes(queue_meta)sys.stdout.write("Set Queue Attributes Succeed!\n\n")except MNSExceptionBase as e:sys.stderr.write("Set Queue Attributes Fail!\nException:%s\n\n" % e)sys.exit(1)#获取队列属性## 除可设置属性外,返回如下属性:## ActiveMessages:可消费消息数,近似值## InactiveMessages: 正在被消费的消息数,近似值## DelayMessages:延迟消息数,近似值## CreateTime: queue创建时间,单位:秒## LastModifyTime:修改queue属性的最近时间,单位:秒try:queue_meta = my_queue.get_attributes()sys.stdout.write("Get Queue Attributes Succeed! \\nQueueName: %s\nVisibilityTimeout: %s \\nMaximumMessageSize: %s\nDelaySeconds: %s \\nPollingWaitSeconds: %s\nActiveMessages: %s \\nInactiveMessages: %s\nDelayMessages: %s \\nCreateTime: %s\nLastModifyTime: %s\n\n" %(queue_meta.queue_name, queue_meta.visibility_timeout,queue_meta.maximum_message_size, queue_meta.delay_seconds,queue_meta.polling_wait_seconds, queue_meta.active_messages,queue_meta.inactive_messages, queue_meta.delay_messages,queue_meta.create_time, queue_meta.last_modify_time))except MNSExceptionBase as e:sys.stderr.write("Get Queue Attributes Fail!\nException:%s\n\n" % e)sys.exit(1)#列出所有队列## prefix指定queue name前缀## ret_number 单次list_queue最大返回队列个数## markerlist_queue的开始位置; 当一次list queue不能列出所有队列时,返回的next_marker作为下一次list queue的marker参数try:prefix = u""ret_number = 10marker = u""total_qcount = 0while(True):queue_url_list, next_marker = my_account.list_queue(prefix, ret_number, marker)total_qcount += len(queue_url_list)for queue_url in queue_url_list:sys.stdout.write("QueueURL:%s\n" % queue_url)if(next_marker == ""):breakmarker = next_markersys.stdout.write("List Queue Succeed! Total Queue Count:%s!\n\n" % total_qcount)except MNSExceptionBase as e:sys.stderr.write("List Queue Fail!\nException:%s\n\n" % e)sys.exit(1)#发送消息## set_delayseconds设置消息的延迟时间,单位:秒## set_priority 设置消息的优先级## 返回如下属性:## MessageId 消息编号## MessageBodyMd5 消息正文的MD5值msg_body = "I am test Message."message = Message(msg_body)message.set_delayseconds(0)message.set_priority(10)try:send_msg = my_queue.send_message(message)sys.stdout.write("Send Message Succeed.\nMessageBody:%s\nMessageId:%s\nMessageBodyMd5:%s\n\n" % (msg_body, send_msg.message_id, send_msg.message_body_md5))except MNSExceptionBase as e:sys.stderr.write("Send Message Fail!\nException:%s\n\n" % e)sys.exit(1)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。