Dateien nach "pg26" hochladen

This commit is contained in:
2025-10-30 00:20:40 +00:00
parent bd19860ae7
commit b83875874b

390
pg26/pndSRC.py Normal file
View File

@@ -0,0 +1,390 @@
import network
import pndConfig as cfg
from mqtt import MQTTClient
import time
import ntptime
import machine
import socket
import array
import gc
import sys
import ujson
import dht
from machine import Pin, I2C, ADC
class dtime():
def __init__(self):
year, mon, day, h, m, s, nope, nope2 = time.localtime()
h = h + 2
year = year - 2000
if(h == 24 ):
h = 0
elif(h == 25):
h = 1
self.date = str(day) + "." + str(mon) + "." + str(year)
if(m < 10):
sM = "0" + str(m)
else:
sM = str(m)
if(s < 10):
sS = "0" + str(s)
else:
sS = str(s)
if(h < 10):
sH = "0" + str(h)
else:
sH = str(h)
self.time = sH + ":" + sM+ ":" + sS
class sens():
ADC = False
def __init__(self, id, type, pin):
self.id = id
self.type = type
class_ = getattr(dht, self.type)
self.sensor = class_(Pin(pin))
def update(self):
self.sensor.measure()
self.temp = self.sensor.temperature()
self.hum = self.sensor.humidity()
return self.temp, self.hum
def getStr(self):
self.sensor.measure()
self.temp = self.sensor.temperature()
self.hum = self.sensor.humidity()
return str(self.temp), str(self.hum)
class adcsens():
ADC = True
def __init__(self, id, pin):
self.adc = ADC(Pin(pin))
self.adc.atten(ADC.ATTN_11DB)
self.adc.width(ADC.WIDTH_12BIT)
self.id = id
def update(self):
self.percent = (cfg.defaults.gc_adc_max-self.adc.read())*100/(cfg.defaults.gc_adc_max-cfg.defaults.gc_adc_min)
self.value = self.adc.read()
return self.percent
def getStr(self):
self.update()
return str(self.percent)
class display():
def __init__(self):
self.i2c = I2C(sda=Pin(4), scl=Pin(5))
self.oled = ssd1306.SSD1306_I2C(64, 48, self.i2c)
self.view = 0
def addTxt(self, text, posx, posy, color):
self.oled.text(text, posx, posy, color)
def flush(self):
self.oled.fill(1)
self.oled.show()
self.oled.fill(0)
self.oled.show()
def show(self):
self.oled.show()
def invert(self):
if(self.view == 0):
self.oled.invert(1)
self.view = 1
else:
self.oled.invert(0)
self.view = 0
def contrast(self, cont):
self.oled.contrast(cont)
class mqtt():
def mqtt_callback(self, topic,msg):
print(msg)
def connect(self):
client = MQTTClient(cfg.gc_name, cfg.mqtt.gc_host,user=cfg.mqtt.gc_user, password=cfg.mqtt.gc_secret, port=cfg.mqtt.gc_port, keepalive=5)
client.set_callback(self.mqtt_callback)
client.connect()
print('mqtt connected!')
return client
def disconnect(self, client):
client.disconnect()
print('mqtt disconnected!')
def publish(self, client, topic, msg ):
client.publish(topic, msg)
print('mqtt message send to topic: ' + topic )
def subscribe(self, client):
client.subscribe(cfg.mqtt.gc_topic)
def ping(self):
client = self.client
try:
client.ping()
return True
except Exception as e:
return False
class wifi():
wlan = network.WLAN(network.STA_IF)
lastTry = 0
e = "wifi struggle"
def connect(self):
conntimeout = 0
wlan = self.wlan
wlan.active(False)
wlan.active(True)
wlan.connect(cfg.wifi.gc_ssid, cfg.wifi.gc_secret)
print('connecting to: ' + cfg.wifi.gc_ssid)
while not wlan.isconnected():
print('connecting...')
time.sleep(1)
conntimeout = conntimeout + 1
if(conntimeout > 20): break
if(not wlan.isconnected()): ecx.handle(ecx(),self,self.e)
print(wlan.ifconfig())
print('connected!')
def disconnect(self):
wlan = self.wlan
wlan.disconnect()
wlan.active(False)
print('disconnected!')
def reconnect(self):
wlan = self.wlan
wlan.disconnect()
wlan.active(False)
print('disconnected!')
wlan.active(False)
del wlan
time.sleep(1)
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
conntimeout = 1
self.wlan.connect(cfg.wifi.gc_ssid, cfg.wifi.gc_secret)
print('connecting to: ' + cfg.wifi.gc_ssid)
while not self.wlan.isconnected():
print('connecting...')
time.sleep(1)
conntimeout = conntimeout + 1
self.lastTry = 1
if(conntimeout > 20): break
if(not self.wlan.isconnected()): ecx.handle(self, self.e)
def getMac(self):
import ubinascii
mac = ubinascii.hexlify(self.wlan.config('mac'),':').decode()
return mac
def status(self):
wlan = self.wlan
status = wlan.status()
if(status == 3):
ip, netmask, gateway, dns = wlan.ifconfig()
status = "\r\nWLan connected to: " + cfg.wifi.gc_ssid
status += "\r\nWLan IP: " + ip
status += "\r\nWLan NetMask: " + netmask
status += "\r\nWLan Gateway: " + gateway
status += "\r\nWLan DNS: " + dns
return status
else:
return "\r\nnot connected"
def getIp(self):
wlan = self.wlan
ip, netmask, gateway, dns = wlan.ifconfig()
return ip
def createAP(self):
ap = network.WLAN(network.AP_IF)
## ap.SetConfig(cfg.XY...)
ap.active(True)
## todo
pass
def disableAP(self):
ap = network.WLAN(network.AP_IF)
ap.active(False)
class bt():
def __init__(self):
import bluetooth
self.bt = bluetooth.BLE()
self.bt.active(False) # Bluetooth deaktivieren
print("Bluetooth disabled:", not self.bt.active())
class system():
def __init__(self):
self.timeInit = time.time()
def setCallback(self, obj):
self.cb = obj
def status(self):
cpufq = machine.freq() / 1000000
memfree = gc.mem_free() / 1024
memusage = gc.mem_alloc() / 1024
mem = memfree + memusage
timeDiff = time.time()-self.timeInit
(minutes, seconds) = divmod(timeDiff, 60)
(hours, minutes) = divmod(minutes, 60)
(days,hours) = divmod(hours, 24)
system = "System:\nDevice: "+ cfg.gc_name + "\nSystem: " + sys.platform + "\nFirmware: " + sys.version
system += "\n\nRessources:\nCPU Speed: " + "{:.0f}".format(cpufq) + "MHz"
system += "\nMEM All: " + "{:.2f}".format(mem) + "Kb"
system += "\nMEM Free: " + "{:.2f}".format(memfree) + "Kb"
system += "\nMEM Used: " + "{:.2f}".format(memusage) + "Kb"
system += "\nSYS Uptime: "+ str(days) + ":" + str(hours) + ":" + str(minutes) + ":" + str(seconds)
system += "\nSYS MQTT Message Count: " + "{:.0f}".format(self.cb.cycle) + "Messages"
return system
def statusJSON(self):
cpufq = machine.freq() / 1000000
memfree = gc.mem_free() / 1024
memusage = gc.mem_alloc() / 1024
mem = memfree + memusage
uptimeS = self.cb.cycle * 60
uptimeM = uptimeS / 60
uptimeH = uptimeM / 60
uptimeD = uptimeH / 24
jsSys = ujson.dumps({ "ip": self.cb.wifi.getIp(), "name": cfg.gc_name, "mqtt_msg_count": self.cb.cycle, "uptime_min" : uptimeM, "uptime_days" : "{:.2f}".format(uptimeD),
"uptime_hr" : "{:.2f}".format(uptimeH), "memory" : "{:.2f}".format(mem), "mem_free" : "{:.2f}".format(memfree), "mem_usage" : "{:.2f}".format(memusage) })
return jsSys
class ecx():
def handle(obj, e):
try:
if(hasattr(obj, 'lastTry')):
if(obj.lastTry == 1):
print("FATAL ERROR - Restart device. Error: " + str(e))
time.sleep(5)
machine.reset()
print("FATAL ERROR - Trying recovery from: " + str(e))
time.sleep(2)
del obj
import pnd as recoverPND
fresh_runtime = recoverPND.rt()
fresh_runtime.init(1)
time.sleep(1)
fresh_runtime.run()
except:
import pnd as lastPND
last_runtime = lastPND.rt()
last_runtime.init(1)
time.sleep(1)
last_runtime.run()
class webserver():
sockl = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockl.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def run(self):
s = self.sockl
s.bind(('', cfg.defaults.gc_websrvport))
s.listen(5)
print('WebServices up and running! Waiting for connections...')
while True:
try:
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024)
self.handler(str(request), conn)
gc.collect()
time.sleep(0.2)
except Exception as e:
pass
def handler(self, request, conn):
#if json data handle with care
json = request.find('/?json=')
if(json == 6):
self.JSONHandler(request, conn)
exit()
get = request.find('/?get=system')
fun = request.find('/?fun=')
print(get)
response = ""
if(get == 6):
s = "jo läuft das ding :D"
final = s.replace("\n", "<br>")
response += '<html><head><title>Pundo Web Server</title></head><body><h1>Pundo Web Server</h1><p><strong>' + final + '</strong></p></body></html>'
print("bin 2")
conn.send('HTTP/1.1 200 OK\n')
conn.send('Content-Type: text/html\n')
conn.send('Connection: close\n\n')
conn.sendall(response)
conn.close()
elif(fun == 6):
print('RFC')
response += '<html><head><title>Pundo Web Server</title></head><body><h1>Pundo Web Server</h1><p><strong>RFC in progress...</strong></p></body></html>'
conn.send('HTTP/1.1 200 OK\n')
conn.send('Content-Type: text/html\n')
conn.send('Connection: keep-alive\n')
conn.send('Keep-Alive: timeout=30, max=100\n\n')
conn.sendall(response)
self.funHandler(response, conn)
#conn.close()
#time.sleep(3)
#machine.reset()
else:
response += '<html><head><title>Pundo Web Server</title></head><body><h1>Pundo Web Server</h1><p><strong>Wrong Command</strong></p></body></html>'
conn.send('HTTP/1.1 200 OK\n')
conn.send('Content-Type: text/html\n')
conn.send('Connection: close\n\n')
conn.sendall(response)
conn.close()
def JSONHandler(self, request, conn):
pass
def funHandler(self, request, conn):
fun = request.find('/?fun=')
class pndIFBase:
def pndObj(self, id:str) -> Object:
"""Test pls hang on"""
pass
def pndTest(self) ->str:
return "Still in development please hang on"
class httpRQ(pndIFBase):
def pndObj(self):
return self
def post(self, url, data):
post_data = data ##ujson.dumps(data)
request_url = url
res = requests.post(request_url, headers = {'content-type': 'application/json'}, data = post_data)
return res.text
# init system
#wifi = wifi()
#Psystem = system()