# funcs.py
# V0.5.0 LDO 19/10/2022: initial version
# V0.5.1 LDO 12/11/2022: refactor modules
# V0.6.1 LDO 17/12/2022: release candidate 1
'''
grafanacode: utility functions
'''
#******************************************************************************
# EXTERNAL MODULE REFERENCES
#******************************************************************************
import os
import importlib
import math
import requests
#******************************************************************************
# GENERAL HELPER FUNCTIONS
#******************************************************************************
[docs]def generateUID(alphabet=None, size=None):
'''
Get an UID; credits: https://github.com/puyuan/py-nanoid
Parameters:
alphabet (string): a string of characters allowed in the UID, defaults to '_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
size (int): the length of the UIT to generate, defaults to 9
Returns:
string: generated UID
'''
if alphabet is None:
alphabet = '_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
alphabet_len = len(alphabet)
if size is None:
size = 9
uid = ''
mask = 1
if alphabet_len > 1:
mask = (2 << int(math.log(alphabet_len - 1) / math.log(2))) - 1
steps = int(math.ceil(1.6 * mask * size / alphabet_len))
while True:
random_bytes = bytearray(os.urandom(steps))
for step in range(steps):
random_byte = random_bytes[step] & mask
if random_byte < alphabet_len:
if alphabet[random_byte]:
uid += alphabet[random_byte]
if len(id) >= size:
return uid
#******************************************************************************
[docs]def getJsonKey(jsondict):
'''
Get the key from a JSON dict if it exists, otherwise None
Parameters:
jsondict (dict): json source
Returns:
string: dict key or None
'''
if type(jsondict) != dict:
return None
if jsondict == {}:
return None
return list(jsondict.keys())[0]
#******************************************************************************
[docs]def addJsonItem(jsondict, keylist, value):
'''
Add an element to a JSON, eventually create parent levels
Parameters:
jsondict (dict): json where to add to (will be modified by function
keylist (list): list with keys on subsequent levels
value (any): value to add, can be bool, str, int, float, list, dict, obj, ...
Returns:
nothing
'''
if type(jsondict) is not dict:
raise Exception(f'json is not a dict but:{type(jsondict)}')
elif type(keylist) is not list:
raise Exception(f'keylist is not a list but:{type(keylist)}')
elif len(keylist) == 0:
raise Exception(f'keylist empty; keylist:{keylist}')
else:
if len(keylist) == 1:
jsondict[keylist[0]] = value
else:
hjson = jsondict
for key in keylist[:-1]:
if key not in hjson:
hjson[key] = {}
hjson = hjson[key]
hjson[keylist[-1]] = value
#******************************************************************************
[docs]def getJsonItem(jsondict, keylist, default='KEYERROR'):
'''
Get an element from a JSON if it exists, otherwise None
Parameters:
jsondict (dict): json source
keylist (list): list with keys on subsequent levels
default (any): KEYERROR raises exception, else default if not found
Returns:
any: found value or default value
'''
if len(keylist) == 0:
if default == 'KEYERROR':
raise Exception(f'key not found; keylist:{keylist}')
return default
else:
if type(jsondict) != dict:
raise Exception(f'json is not a dict; json:{jsondict}')
if keylist[0] in jsondict:
if len(keylist) == 1:
return jsondict[keylist[0]]
else:
return getJsonItem(jsondict[keylist[0]], keylist[1:], default)
else:
if default == 'KEYERROR':
raise Exception(f'key not found; keylist:{keylist}')
return default
#******************************************************************************
[docs]def cutJsonItem(jsondict, keylist, default='KEYERROR'):
'''
Get an element from a JSON if it exists and delete it , otherwise None
Parameters:
jsondict (dict): json source; the found value will be removed
keylist (list): list with keys on subsequent levels
default (any): KEYERROR raises exception, else default if not found
Returns:
any: found value or default value
'''
if len(keylist) == 0:
if default == 'KEYERROR':
raise Exception(f'key not found; keylist:{keylist}')
return default
else:
if keylist[0] in jsondict:
if len(keylist) == 1:
return jsondict.pop(keylist[0], default)
else:
return cutJsonItem(jsondict[keylist[0]], keylist[1:], default)
else:
if default == 'KEYERROR':
raise Exception(f'key not found; keylist:{keylist}')
return default
#******************************************************************************
[docs]def filterByProperty(inlist, propertyname, value):
'''
Filter from list by property; return a new list with the items where named property has requested value. E.g. ::
filterByProperty([{'propX': value_1, 'a': 1}, {'propY': value_2, 'a': 2}, {'propX': value_1, 'a': 3}, {'propX': value_2, 'a': 4}], 'propX', value_1}
returns: [{'propX': value_1, 'a': 1}, {'propX': value_1, 'a': 3}]
Parameters:
inlist (list): source list
propertyname (str): property name
value (any): value to compare with
Returns:
string: dict key or None
'''
retval = []
for item in inlist:
if propertyname in item:
if item[propertyname]==value:
retval.append(item)
return retval
#**********************************************************
[docs]def deepMerge(destination, dict2add):
'''
Recursive merge the additional dictionary in the destination dictionary, overwrite if key exists.
Parameters:
destination (dict): dictionary, will be modified
dict2add (dict): dictionary to merge, also on sublevels.
Returns:
nothing
'''
# pylint: disable=too-many-branches
if dict2add is None: # nothing to dict2add
return
if destination is None: # no destination, so simply replace
destination = dict2add
return
if isinstance(destination, (bool, int, float, str)): # destination is simple type: replace
destination = dict2add
return
if isinstance (destination, list): # destination is list: can dict2add to list
if isinstance (dict2add, list): # check if we want to merge lists
if dict2add != []: # then list to merge can't be empty
destination += dict2add
else: # we want to append another type to list
destination[key].append(dict2add[key]) # TODO: Using variable 'key' before assignment (used-before-assignment)
return
if isinstance (destination, dict): # destination is dict: can dict2add to dict
if isinstance (dict2add, dict): # check if we want to merge dicts
if dict2add != {}: # then dict to merge can't be empty
for key in dict2add: # iterate over all items in dict2add
if key not in destination: # if key doesn't exist in destination then simply dict2add
destination[key] = dict2add[key]
destination[key] = dict2add[key]
else: # if key does exist in destination then merge (iterate)
deepMerge(destination[key], dict2add[key])
else: # we can only merge dicts
raise Exception(f'{dict2add} is not a dict but a {type(dict2add)}')
#**********************************************************
[docs]def cleanFilename(source):
'''
Remove invalid characters ' %:/,.\\[]<>*?' from filename
Parameters:
source (str): input name
Returns:
string: source converted to valid filename
'''
remove = ' %:/,.\\[]<>*?'
return ''.join([c for c in source if c not in remove])
#******************************************************************************
# AUX FUNCTIONS
#******************************************************************************
[docs]def getPlugins():
'''
Get all plugins available in the /plugins map
Parameters:
nothing
Returns:
dict: dict with all available plugins
'''
# pylint: disable=import-outside-toplevel
path = os.path.dirname(__file__) + '/plugins'
files = os.listdir(path)
plugins = {'targets': [], 'dashboards': [], 'panels': [],}
from inspect import getmembers, isfunction
for fname in files:
if len(fname)<8:
continue
if fname == '__init__.py' or fname[-3:]!='.py':
continue
name = fname[:-3]
if len(name)>7 and name[:7] == 'target_':
plugins['targets'].append(name)
elif len(name)>10 and name[:10] == 'dashboard_':
plugins['dashboards'].append(name)
elif len(name)>6 and name[:6] == 'panel_':
plugins['panels'].append(name)
return plugins
#**********************************************************
#**********************************************************
[docs]def printPlugins():
'''
Get and print all plugins available in the /plugins map
'''
plugins = getPlugins()
print('\n available plugins:')
print('-------------------')
print(' Targets:')
for item in plugins['targets']:
print(f' {item}')
print('-------------------')
print(' Dashboards:')
for item in plugins['dashboards']:
print(f' {item}')
print('-------------------')
print(' Panels:')
for item in plugins['panels']:
print(f' {item}')
print('-------------------')
#**********************************************************
# Print all plugins