Source code for routeros_telegraf_exporter.models

"""Models module

Abstraction layer for models
"""

import os
import threading
from datetime import datetime

import yaml
from yaml import load

try:
    from yaml import CLoader as Loader
except ImportError:
    from yaml import Loader


[docs]def load_config(config_file): here = os.path.abspath(os.path.dirname(__file__)) config_file = "{}/{}".format(os.environ.get("ROUTEROS_EXPORTER_PATH", here), config_file) try: with open(config_file) as file: data = load(file, Loader=Loader) return data except FileNotFoundError: from routeros_telegraf_exporter import DEFAULT_CONF return load(DEFAULT_CONF, Loader=Loader)
[docs]def load_hosts_from_config(config): hosts = list(map(lambda x: list(x.keys())[0], config)) if "default" in hosts: hosts.remove("default") return ",".join(hosts)
[docs]class Args(object): user = None host = None password = None port = 8728 hosts = [] sleep = 60 resource = [] ignore_interval = False def __init__(self, user=os.getenv("ROUTEROS_API_USERNAME", 'api_read_user'), password=os.getenv("ROUTEROS_API_PASSWORD", 'do_not_expose_password_here'), port=os.getenv("ROUTEROS_API_PORT", 8728), hosts=[], daemon=False, output_type="influx", resource=[], hosts_config_file=os.getenv("ROUTEROS_EXPORTER_HOSTS_CONFIG", 'hosts_config.yaml'), ignore_interval=False): self.lock = threading.Lock() self.user = user self.password = password self.hosts = hosts self.port = port self.daemon = daemon self.output_type = output_type self.resource = resource self.ignore_interval = ignore_interval self.hosts_config = load_config(hosts_config_file) if self.hosts_config: self.hosts = load_hosts_from_config(self.hosts_config)
[docs]class JsonData(object): time = None tags = dict([]) fields = dict([]) measuremenst = None def __init__(self, tags=dict([]), fields=dict([]), measurement=None): date = datetime.now() self.time = date.strftime('%Y-%m-%dT%H:%M:%SZ') self.tags = tags self.fields = fields