Today, I will explain a real-life problem I have solved using Python. Turning on and off the laptop charger is an annoying task for me. I often forgot to turn on the charge before the battery drained completely. So I decide to make a system that automates my charging using NodeMCU and Hive MQTT broker. We can create a free hive account by visiting this
Plan
- A script that checks battery percentage and publishes “on or off” to a topic
- Write a cron job to trigger the above script every 5 minutes
- NodeMCU subscribes to the above topic and control plug using a relay module
Circuit
Components Required
- NodeMCU
- AC to DC convertor
- Relay Module
- Plug Point
- PVC box
- Indicator(optional)
- Wire
- Prototype board
- Wire connector
Making Video
Code in Laptop
If you are not familiar with MQTT using Python, Please visit MQTT using Python
psutil is used for finding the battery status and certifi is used for the certificate in case you are HTTPS.
import random
import time
from paho.mqtt import client as mqtt_client
import certifi #For Ceritificate
import psutil # For Checking battery Percentage
## MQTT broker details here
broker = '<broker host here>'
port = <port here>
topic = <your topic here>
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = <username>
password = <password>
# .........................................................................
connected = False
max_threshold = 90 #maximum threshold where we need to off the supply
min_threshold = 30 ##minimum threshold where we need to off the supply
def on_connect(client, userdata, flags, rc):
if rc == 0:
global connected
connected = True
battery = psutil.sensors_battery()
message = "False"
if battery.power_plugged:
if battery.percent >= max_threshold:
message = "False"
else:
message = "True"
else:
if battery.percent <= min_threshold:
message = "True"
print("message is: ", message)
result = client.publish(topic, message)
else:
print("Failed to connect, return code %d\n", rc)
def connect_mqtt():
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.tls_set(certifi.where())
client.on_connect = on_connect
client.connect(broker, port)
return client
def run():
client = connect_mqtt()
client.loop_start()
while not connected:
print("connecting ....")
time.sleep(1)
if __name__ == '__main__':
run()
Using crontab i run this code every 5 minutes
NodeMCU Code
If you are not familiar with running python in NodeMCU. Please visit Run MicroPython on NodeMCU
I am using a package called umqttsimple for connecting to nodemcu.
import time
from umqttsimple import MQTTClient
import ubinascii
import machine
import network
import gc
gc.collect() # Collect Garbages
output_pin = machine.Pin(2, Pin.OUT) ## Setting pin 2(GPIO2 or D4) as output pin
ssid = <your ssid>
password = <your password>
mqtt_server = <mqtt host>
client_id = ubinascii.hexlify(machine.unique_id()) ## Generate Unique client id
station = network.WLAN(network.STA_IF) # Setting NodeMCU as Wifi Client
station.active(True)
station.connect(ssid, password) # Trigger connection
while station.isconnected() == False:
pass
## Wait until connected
def subscribe_callback(topic, msg):
"""Subscribe callback function when new message is send
This will be triggered
First parameter is the topic which message is recieved
second parameter is the message
"""
if topic == b'socket':
if msg == b'True':
output_pin.off() # Set pin value to Low
else:
output_pin.on() # Set pin value to High
def connect_and_subscribe():
global client_id, mqtt_server, topic_sub
#If you are using SSL then ssl=True and ssl_params must be passed else no need
client = MQTTClient(client_id, mqtt_server, user=<MQTT Username>, password=<MQTT password>,
ssl=True, ssl_params={"server_hostname": <MQTT Host>})
client.set_callback(subscribe_callback) #Setting callback function for client
client.connect() #Trigger connection
client.subscribe("socket") # Subscribe to a top socket
return client
def restart_and_reconnect():
# When nodemcu get stuck use this function to reset
print('Failed to connect to MQTT broker. Reconnecting...')
time.sleep(10)
machine.reset() # Reset machine
try:
client = connect_and_subscribe()
except OSError as e:
restart_and_reconnect()
while True:
try:
client.check_msg() #Check the message in infinite loop
except OSError as e:
restart_and_reconnect()
These system solved my biggest issue while working. Hope you have learned something from the post. If you find this useful please share with your friends. Also please share your suggestions at afsal@parseltongue.co.in
Originally published at https://parseltongue.co.in on June 6, 2022.