久久久精品一区ed2k-女人被男人叉到高潮的视频-中文字幕乱码一区久久麻豆樱花-俄罗斯熟妇真实视频

PythonMQTT客戶端怎么使用

本篇內(nèi)容介紹了“Python MQTT客戶端怎么使用”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),華容企業(yè)網(wǎng)站建設(shè),華容品牌網(wǎng)站建設(shè),網(wǎng)站定制,華容網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,華容網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。

paho-mqtt

paho-mqtt 可以說是 Python MQTT 開源客戶端庫中的佼佼者。它由 Eclipse 基金會主導(dǎo)開發(fā),除了 Python 庫以外,同樣支持各大主流的編程語言,比如 C++、Java、JavaScript、Golang 等。目前 Python 版本已經(jīng)實現(xiàn)了 3.1 和 3.1.1 MQTT 協(xié)議,在最新開發(fā)版中實現(xiàn)了 MQTT 5.0。

在基金會的支持下,以每年一個版本的速度更新,本文發(fā)布時的最新版本為 1.5.0(于 2019 年 8 月發(fā)布)。

在 GitHub 主頁上,它提供了從入門的快速實現(xiàn)到每一個函數(shù)的詳細解讀,涵蓋了從初學(xué)者到高級使用者需要了解的各個部分。即使遇到超出范圍的問題,在 Google 上搜索,可以得到近 20 萬個相關(guān)詞條,是目前最為流行的 MQTT 客戶端。

得到如此多的關(guān)注度,除了穩(wěn)定的代碼外,還有其易用性。Paho 的接口使用非常簡單優(yōu)雅,您只需要少量的代碼就能實現(xiàn) MQTT 的訂閱及消息發(fā)布。

安裝

pip3 install paho-mqtt

或者

git clone https://github.com/eclipse/paho.mqtt.python
cd paho.mqtt.python
python3 setup.py install

訂閱者

import paho.mqtt.client as mqtt

# 連接的回調(diào)函數(shù)
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe("$SYS/#")
    
# 收到消息的回調(diào)函數(shù)
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()

發(fā)布者

import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
for i in range(3):
    client.publish('a/b', payload=i, qos=0, retain=False)
    print(f"send {i} to a/b")
    time.sleep(1)

client.loop_forever()

甚至,你可以通過一行代碼,實現(xiàn)訂閱、發(fā)布。

import paho.mqtt.subscribe as subscribe

# 當調(diào)用這個函數(shù)時,程序會堵塞在這里,直到有一條消息發(fā)送到 paho/test/simple 主題
msg = subscribe.simple("paho/test/simple", hostname="broker.emqx.io")
print(f"{msg.topic} {msg.payload}")
import paho.mqtt.publish as publish

# 發(fā)送一條消息
publish.single("a/b", "payload", hostname="broker.emqx.io")
# 或者一次發(fā)送多個消息
msgs = [{'topic':"a/b", 'payload':"multiple 1"}, ("a/b", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="broker.emqx.io")

HBMQTT

HBMQTT 基于 Python asyncio 開發(fā),僅支持 3.1.1 的 MQTT 協(xié)議。由于使用 asyncio 庫,開發(fā)者需要使用 3.4 以上的 Python 版本。

CPU 的速度遠遠快于磁盤、網(wǎng)絡(luò)等 IO 操作,而在一個線程中,無論 CPU 執(zhí)行得再快,遇到 IO 操作時,都得停下來等待讀寫完成,這無疑浪費了許多時間。

為了解決這個問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標準庫中,并在 Python 3.5 中,加入了 async/await 關(guān)鍵字。用戶可以很輕松的使用在函數(shù)前加入 async 關(guān)鍵字,使函數(shù)變成異步函數(shù)。

HBMQTT 便是建立在 asyncio 標準庫之上。它允許用戶顯示的設(shè)置異步斷點,通過異步 IO,MQTT 客戶端在收取消息或發(fā)送消息時,掛載當前的任務(wù),繼續(xù)處理下一個。

不過 HBMQTT 的知名度卻小得多。在 Google 上搜索,關(guān)于 HBMQTT 僅有 6000 多個詞條,在 Stack Overflow 上只有 10 個提問數(shù)。這就意味著,如果選擇 HBMQTT 的話你需要很強的解決問題的能力。

有意思的是,HBMQTT 本身也是一個 MQTT 服務(wù)器。你可以通過 hbmqtt 命令一鍵開啟。

$ hbmqtt
[2020-08-28 09:35:56,608] :: INFO - Exited state new
[2020-08-28 09:35:56,608] :: INFO - Entered state starting
[2020-08-28 09:35:56,609] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)

安裝

pip3 install hbmqtt

或者

git clone https://github.com/beerfactory/hbmqtt
cd hbmqtt
python3 setup.py install

訂閱者

import logging
import asyncio
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2

async def uptime_coro():
    C = MQTTClient()
    await C.connect('mqtt://broker.emqx.io/')
    await C.subscribe([
            ('$SYS/broker/uptime', QOS_1),
            ('$SYS/broker/load/#', QOS_2),
         ])
    try:
        for i in range(1, 100):
            message = await C.deliver_message()
            packet = message.publish_packet
            print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
        await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
        await C.disconnect()
    except ClientException as ce:
        logging.error("Client exception: %s" % ce)
        
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(uptime_coro())

發(fā)布者

import logging
import asyncio
import time
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2

async def test_coro():
    C = MQTTClient()
    await  C.connect('mqtt://broker.emqx.io/')
    tasks = [
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
    ]
    await asyncio.wait(tasks)
    logging.info("messages published")
    await C.disconnect()
    
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(test_coro())

更多使用細節(jié)情參考官方文檔:https://hbmqtt.readthedocs.io/en/latest/。

gmqtt

gmqtt 是由個人開發(fā)者開源的客戶端庫。默認支持 MQTT 5.0 協(xié)議,如果連接的 MQTT 代理不支持 5.0 協(xié)議,則會降級到 3.1 并重新進行連接。

相較于前兩者,gmqtt 還屬于初級開發(fā)階段,本文發(fā)布時的版本號是 0.6.7。但它是早期支持 MQTT 5.0 的 Python 庫之一,因此在網(wǎng)絡(luò)上知名度尚可。

同樣,它建立在 asyncio 庫上,因此需要使用 Python 3.4 以上的版本。

安裝

pip3 install gmqtt

或者

git clone https://github.com/wialon/gmqtt
cd gmqtt
python3 setup.py install

訂閱者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic} {payload}')
    
def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()

async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_subscribe = on_subscribe
    client.on_disconnect = on_disconnect
    
    # 連接 MQTT 代理
    await client.connect(broker_host)
    
    # 訂閱主題
    client.subscribe('TEST/#')
    
    # 發(fā)送測試數(shù)據(jù)
    client.publish("TEST/A", 'AAA')
    client.publish("TEST/B", 'BBB')
    
    await STOP.wait()
    await client.disconnect()
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)

    host = 'broker.emqx.io'
    loop.run_until_complete(main(host))

發(fā)布者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('TEST/#', qos=0)
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic}, {payload}')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()
    
async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    
    await client.connect(broker_host)
    
    client.publish('TEST/TIME', str(time.time()), qos=1)
    
    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)
    
    host = 'broker.emqx.io'  
    loop.run_until_complete(main(host))

如何選擇

在介紹完這三款 Python MQTT 客戶端庫之后,我們再來看看如何為自己選擇合適的 MQTT 客戶端庫。這三個客戶端各有自己的優(yōu)缺點:

paho-mqtt 有著最優(yōu)秀的文檔,代碼風(fēng)格易于理解,同時有著強大的基金會支持,但目前文檔的版本還不支持 MQTT 5.0。

HBMQTT 使用 asyncio 庫實現(xiàn),可以優(yōu)化網(wǎng)絡(luò) I/O 帶來的延遲。但是代碼風(fēng)格不友好,同樣不支持 MQTT 5.0。

gmqtt 同樣通過 asyncio 庫實現(xiàn),相比 HBMQTT ,代碼風(fēng)格友好,最重要的是,它支持 MQTT 5.0。但開發(fā)進程慢,未來前景不明。

因此,在選擇時,您可以參考一下的思路:

  • 如果您是正常開發(fā),想要將其運用在生產(chǎn)環(huán)境中,paho-mqtt 無疑是最好的選擇,其穩(wěn)定性和代碼易讀性遠遠超過其它兩個庫。在遇到問題時,優(yōu)秀的文檔和互聯(lián)網(wǎng)上大量的詞條,也能幫您找到更多的解決方案。

  • 對于熟練使用 asyncio 庫的讀者,不妨嘗試一下 HBMQTT 和 gmqtt。

  • 如果您想要學(xué)習(xí)、參與開源項目或者使用 MQTT 5.0, 則不妨試用一下 gmqtt,并嘗試為其共享一份代碼吧。

“Python MQTT客戶端怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

本文標題:PythonMQTT客戶端怎么使用
本文地址:http://sd-ha.com/article16/gpssgg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、定制開發(fā)、企業(yè)建站、網(wǎng)站設(shè)計公司、App設(shè)計、自適應(yīng)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)