Compare commits

..

8 Commits

4 changed files with 279 additions and 168 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*.csv
__pycache__

204
python/Muell/__init__.py Normal file
View File

@ -0,0 +1,204 @@
'''
Um die Werte für kommune und strasse zu erfahren, rufe https://www.egst.de/de/abfallabholung/ per
Browser auf und ermittle die gesendeten Daten in der Entwicklerkonsole deines Browsers.
Die Angaben hier im Skript stehen für Hörstel als Kommune und Im Wiesengrund als Strasse
Um die Daten an Telegram zu senden, musst du dir ein Skript/Programm schreiben, dass die API
von Telegram verwendet und Daten an dich oder deine Gruppe senden kann!
Beispiel für eine Konfigurationsdatei:
kommune: 2601
strasse: 2146
telegram:
tgReceiver: <hier die ID der Gruppe/ des Teilnehmers eintragen>
pathBot: '~/bin/YMBot'
signal:
path: "~/bin/signal-cli"
account: "+49<deine Mobilfunknummer>"
group_id: "<siehe 'signal-cli listGroups'>"
'''
import csv
import os
import re
import requests
import httplib2
import urllib
import yaml
from datetime import datetime, timedelta
from pathlib import Path
class Muell:
# Variablen
__key = 'e21758b9c711463552fb9c70ac7d4273'
__modus = 'd6c5855a62cf32a4dadbc2831f0f295f'
__host = 'api.abfall.io'
__url = f'https://{__host}/?key={__key}&modus={__modus}&waction=export_csv'
__export_als = f"{{'action':'{__url}','target':''}}"
__current_year = datetime.today().year
__zeitraum = f'{__current_year}0101-{__current_year}1231'
__postdata = {
'f_id_abfalltyp_0': '50',
'f_id_abfalltyp_1': '161',
'f_id_abfalltyp_2': '53',
'f_id_abfalltyp_3': '187',
'f_id_abfalltyp_4': '169',
'f_abfallarten_index_max': '5',
'f_abfallarten': '50,161,53,187',
'f_zeitraum': __zeitraum,
'f_export_als': __export_als,
}
__headers = {
'User-Agent': 'Mozilla/5.0 (Linux; x68_64; x64; rv:88.0) Gecko/20100101 Firefox/88.0',
'Content-Type': 'application/x-www-form-urlencoded',
}
# Methoden
def __get_index_positions(self, list_of_elems, element):
''' Returns the indexes of all occurrences of given element in
the list- listOfElements '''
index_pos_list = []
for i in range(len(list_of_elems)):
if list_of_elems[i] == element:
index_pos_list.append(i)
return index_pos_list
def __readConfig(self):
''' Liest ~/.muell.yaml und speichert Daten in Dictionary '''
home = Path.home() # home ist ohne / am Ende!
config = {}
with open(f'{home}/.muell.yaml', 'r') as config_file:
config = yaml.safe_load(config_file)
return config
def __init(self, key, modus, host, headers):
''' Initialisiert das System und sucht entsprechende Daten heraus '''
url = f'https://{host}/?key={key}&modus={modus}&waction=init'
http = httplib2.Http()
(resp, content) = http.request(url, "POST", headers = headers, body = urllib.parse.urlencode(self.__postdata))
result = re.findall(r"<input .*?name=.*? value=.*?/>", str(content))
datum = result[1].split(' ')
name = None
value = None
for i in datum:
i = i.split('=')
if i[0] == 'name':
name = i[1][1:-1]
if i[0] == 'value':
value = i[1][1:-1]
return name, value
def __read_file(self, jahr):
''' Liest Daten aus muell<jahr>.csv und liefert ggf. Ergebnis zurück'''
antwort_liste = list()
headline = list()
try:
with open(f'muell{jahr}.csv', 'r', encoding="latin1") as f:
csv_reader = csv.reader(f, delimiter=';')
lines = 0
antwort_liste = []
for row in csv_reader:
if lines == 0:
# print(f'Column names are {", ".join(row)}')
headline = row
lines += 1
else:
if isinstance(row, list):
antwort_liste.append(row)
except FileNotFoundError:
pass
return (antwort_liste, headline)
def get_data(self):
(antwort_liste, headline) = self.__read_file(self.__current_year)
config = self.__readConfig()
if not antwort_liste:
# Keine vernünftigen Daten, ergo mal gucken, was die Webseite ergibt
self.__postdata['f_id_kommune'] = config['kommune']
self.__postdata['f_id_strasse'] = config['strasse']
(name, value) = self.__init(key, modus, host, headers)
if name != None and value != None:
self.__postdata[name] = value
http = httplib2.Http()
(resp, content) = http.request(url, "POST", headers = headers, body = urllib.parse.urlencode(self.__postdata))
antwort = str(content)
f = open(f'muell{self.__current_year}.csv', 'wb')
f.write(content)
f.close()
(antwort_liste, headline) = self.__read_file(self.__current_year)
tomorrow = (datetime.now() + timedelta(1)).strftime('%d.%m.%Y')
index = set();
for row in antwort_liste:
try:
if (row.count(tomorrow) > 1):
for i in range(len(row)):
if (row[i] == tomorrow and i < 4):
index.add(i)
else:
pos = row.index(tomorrow)
if pos < 4:
index.add(pos)
except ValueError:
pass
wird = 'wird' if len(index) < 2 else 'werden'
tonnen = []
for pos in index:
tonnen.append(headline[pos])
return (tonnen, wird)
def ausgabe_telegram(self, tonnen, wird):
''' Ausgabe Richtung Telegram'''
if len(tonnen) > 0:
tonnen = ' und '.join(tonnen)
# Jetzt den Telegram Bot ansprechen
try:
os.system(f'echo "Morgen {wird} {tonnen} abgeholt" | {config["telegram"]["pathBot"]} -u {config["telegram"]["tgReceiver"]}')
except Exception as e:
os.system(f'logger "Fehler Muellbot Telegram: {e}"')
def ausgabe_signal(self, tonnen, wird):
''' Ausgabe Richtung Signal'''
if len(tonnen) > 0:
tonnen = ' und '.join(tonnen)
# Signal Bot
try:
os.system(f'{config["signal"]["path"]} -a {config["signal"]["account"]} send -g {config["signal"]["group_id"]} -m "Morgen {wird} {tonnen} abgeholt"')
except Exception as e:
os.system(f'logger "Fehler Muellbot Signal: {e}"')
def ausgabe_text(self, tonnen, wird):
''' Ausgabe als Text'''
if len(tonnen) > 0:
tonnen = ' und '.join(tonnen)
print(f'Morgen {wird} {tonnen} abgeholt')

View File

@ -1,172 +1,9 @@
#!/usr/bin/python3
'''
Um die Werte für kommune und strasse zu erfahren, rufe https://www.egst.de/de/abfallabholung/ per
Browser auf und ermittle die gesendeten Daten in der Entwicklerkonsole deines Browsers.
Die Angaben hier im Skript stehen für Hörstel als Kommune und Im Wiesengrund als Strasse
Um die Daten an Telegram zu senden, musst du dir ein Skript/Programm schreiben, dass die API
von Telegram verwendet und Daten an dich oder deine Gruppe senden kann!
Beispiel für eine Konfigurationsdatei:
tgReceiver: <hier die ID der Gruppe/ des Teilnehmers eintragen>
kommune: 2601
strasse: 2146
pathBot: '~/bin/YMBot'
tgBotOwner: <hier DEINE Telegram-ID eintragen>
'''
import csv
import os
import re
import requests
import httplib2
import urllib
import yaml
from datetime import datetime, timedelta
from pathlib import Path
# Funktionen
def get_index_positions(list_of_elems, element):
''' Returns the indexes of all occurrences of given element in
the list- listOfElements '''
index_pos_list = []
for i in range(len(list_of_elems)):
if list_of_elems[i] == element:
index_pos_list.append(i)
return index_pos_list
def readConfig():
''' Liest ~/.muell.yaml und speichert Daten in Dictionary '''
home = Path.home() # home ist ohne / am Ende!
config = {}
with open(f'{home}/.muell.yaml', 'r') as config_file:
config = yaml.load(config_file)
return config
def init(key, modus, host, headers):
''' Initialisiert das System und sucht entsprechende Daten heraus '''
url = f'https://{host}/?key={key}&modus={modus}&waction=init'
http = httplib2.Http()
(resp, content) = http.request(url, "POST", headers = headers, body = urllib.parse.urlencode(postdata))
result = re.findall(r"<input .*?name=.*? value=.*?/>", str(content))
datum = result[1].split(' ')
name = None
value = None
for i in datum:
i = i.split('=')
if i[0] == 'name':
name = i[1][1:-1]
if i[0] == 'value':
value = i[1][1:-1]
return name, value
def read_file(jahr):
''' Liest Daten aus muell<jahr>.csv und liefert ggf. Ergebnis zurück'''
antwort_liste = list()
try:
with open(f'muell{jahr}.csv', 'r', encoding="latin1") as f:
csv_reader = csv.reader(f, delimiter=';')
lines = 0
antwort_liste = []
for row in csv_reader:
if lines == 0:
# print(f'Column names are {", ".join(row)}')
lines += 1
else:
if isinstance(row, list):
antwort_liste.append(row)
except FileNotFoundError:
pass
return antwort_liste
# Variablen
key = 'e21758b9c711463552fb9c70ac7d4273'
modus = 'd6c5855a62cf32a4dadbc2831f0f295f'
host = 'api.abfall.io'
url = f'https://{host}/?key={key}&modus={modus}&waction=export_csv'
export_als = f"{{'action':'{url}','target':''}}"
current_year = datetime.today().year
zeitraum = f'{current_year}0101-{current_year}1231'
postdata = {
'f_id_abfalltyp_0': '50',
'f_id_abfalltyp_1': '161',
'f_id_abfalltyp_2': '53',
'f_id_abfalltyp_3': '187',
'f_id_abfalltyp_4': '169',
'f_abfallarten_index_max': '5',
'f_abfallarten': '50,161,53,187,169',
'f_zeitraum': zeitraum,
'f_export_als': export_als,
}
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; x68_64; x64; rv:88.0) Gecko/20100101 Firefox/88.0',
'Content-Type': 'application/x-www-form-urlencoded',
}
muell_arten = [
'Gelbe Tonne',
'Biomüll',
'Papiermüll',
'Restmüll',
'Schadstoffmobil',
]
from Muell import Muell
if __name__ == '__main__':
antwort_liste = read_file(current_year)
config = readConfig()
if not antwort_liste:
# Keine vernünftigen Daten, ergo mal gucken, was die Webseite ergibt
postdata['f_id_kommune'] = config['kommune']
postdata['f_id_strasse'] = config['strasse']
(name, value) = init(key, modus, host, headers)
if name != None and value != None:
postdata[name] = value
http = httplib2.Http()
(resp, content) = http.request(url, "POST", headers = headers, body = urllib.parse.urlencode(postdata))
antwort = str(content)
f = open(f'muell{current_year}.csv', 'wb')
f.write(content)
f.close()
antwort_liste = read_file(current_year)
tomorrow = (datetime.now() + timedelta(1)).strftime('%d.%m.%Y')
index = set();
for row in antwort_liste:
try:
pos = row.index(tomorrow)
if pos < 4:
index.add(pos)
except ValueError:
pass
wird = 'wird' if len(index) < 2 else 'werden'
tonnen = []
for pos in index:
tonnen.append(muell_arten[pos])
if len(tonnen) > 0:
tonnen = ' und '.join(tonnen)
# Jetzt den Bot ansprechen
os.system(f'echo "Morgen {wird} {tonnen} abgeholt" | {config["pathBot"]} -u {config["tgReceiver"]}')
muell = Muell()
tonnen, wird = muell.get_data()
muell.ausgabe_telegram(tonnen, wird)
muell.ausgabe_signal(tonnen, wird)

69
python/show-proc-data.py Normal file
View File

@ -0,0 +1,69 @@
#!/usr/bin/env python
import os
import sys
import struct
def read_proc_data(pid):
'''Liest Daten einer Prozess-ID aus /proc aus und zeigt diese an'''
proc_dir = '/proc/{0}'.format(pid)
if not os.path.exists(proc_dir):
return "Prozess mit PID {0} existiert nicht.".format(pid)
# Lese die Daten aus dem /proc-Verzeichnis
for filename in os.listdir(proc_dir):
file_path = os.path.join(proc_dir, filename)
# Ueberspringe Dateien, die keine Informationen enthalten
if filename in [".", "..", "task", "mem", "cwd", "exe", "pagemap"]:
continue
try:
if filename in ("environ", "cmdline"):
with open(file_path, "rb") as file:
environ_data = file.read()
print("{0}:".format(filename))
for line in environ_data.decode("utf-8", errors="replace").split("\x00"):
if line:
print(line)
print()
elif filename == "auxv":
with open(file_path, "rb") as file:
auxv_data = file.read()
print("{0}:".format(filename))
i = 0
while i < len(auxv_data):
entry_type, entry_value = struct.unpack("qq", auxv_data[i:i+16])
print("Type: {0}, Value: {1}".format(entry_type, entry_value))
i += 16
print()
else:
with open(file_path, "r") as file:
print("{0}:".format(filename))
for line in file:
line = line.rstrip("\n")
if line:
print(line)
print()
except (IOError, UnicodeDecodeError):
pass
return None
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Bitte geben Sie die Prozess-ID (PID) als Argument an.")
sys.exit(1)
try:
pid = int(sys.argv[1])
except ValueError:
print("Ungueltige Prozess-ID (PID).")
sys.exit(1)
read_proc_data(pid)