import elasticsearch as es
import elasticsearch_dsl as es_dsl
import datetime as dt
import time
import math
import json
import re
import inspect
import logging
import os
import requests
from lobster.util import Configurable, PartiallyMutable
import lobster
logger = logging.getLogger('lobster.monitor.elk')
def dictify(thing, skip=None):
thing = dict([(m, o) for (m, o) in inspect.getmembers(thing)
if not inspect.isroutine(o) and not m.startswith('__')])
if isinstance(skip, basestring):
try:
thing.pop(skip)
except KeyError:
pass
else:
for key in skip:
try:
thing.pop(key)
except KeyError:
pass
return thing
def nested_paths(d):
def get_paths(d, parent=[]):
if not isinstance(d, dict):
return [tuple(parent)]
else:
return reduce(list.__add__,
[get_paths(v, parent + [k])
for k, v in d.items()], [])
return ['.'.join(path) for path in get_paths(d)]
def nested_set(d, path, value):
keys = path.split('.')
for key in keys[:-1]:
d = d.setdefault(key, {})
d[keys[-1]] = value
def nested_get(d, path):
keys = path.split('.')
for key in keys:
d = d.get(key)
if d is None:
break
return d
[docs]class ElkInterface(Configurable):
"""
Enables ELK stack monitoring for the current Lobster run using an existing
Elasticsearch cluster.
Attributs modifiable at runtime:
* `es_host`
* `es_port`
* `kib_host`
* `kib_port`
* `dashboards`
* `refresh_interval`
Parameters
----------
es_host : str
Host running Elasticsearch cluster.
es_port : int
Port number running Elasticsearch HTTP service.
kib_host : str
Host running Kibana instance connected to Elasticsearch cluster.
kib_port : int
Port number running Kibana HTTP service.
user : str
User ID to label Elasticsearch indices and Kibana objects.
project : str
Project ID to label Elasticsearch indices and Kibana objects.
dashboards : list
List of dashboards to include from the Kibana templates. Defaults
to including only the core dashboard. Available dashboards: Core,
Advanced, Tasks.
refresh_interval : int
Refresh interval for Kibana dashboards, in seconds. Defaults to
300 seconds = 5 minutes.
"""
_mutable = {'es_host': ('config.elk.update_client', [], False),
'es_port': ('config.elk.update_client', [], False),
'kib_host': ('config.elk.update_kibana', [], False),
'kib_port': ('config.elk.update_kibana', [], False),
'dashboards': ('config.elk.update_kibana', [], False),
'refresh_interval': ('config.elk.update_links', [], False)}
def __init__(self, es_host, es_port, kib_host, kib_port, project,
dashboards=None, refresh_interval=300):
self.es_host = es_host
self.es_port = es_port
self.kib_host = kib_host
self.kib_port = kib_port
self.user = os.environ['USER']
self.project = project
self.dashboards = dashboards or ['Core']
self.refresh_interval = refresh_interval
self.prefix = ('[' + self.user + '_' + self.project + ']').lower()
self.categories = []
self.start_time = None
self.end_time = None
self.previous_stats = {}
self.template_dir = os.path.join(os.path.dirname(
os.path.abspath(lobster.__file__)), 'monitor', 'elk', 'data')
self.client = es.Elasticsearch([{'host': self.es_host,
'port': self.es_port}])
pattern = r'[\\\/*\?\"\,\<\>\|]'
if re.search(pattern, project):
raise ValueError("illegal project {}; must not match {}".format(project, pattern))
# FIXME: supposed to check that the Elasticsearch client exists
# and cause the config file to be rejected if it doesn't
# but breaks lobster with an sqlite3 error in unit.py
# so we check in self.create() instead, which fails more quietly
#
# self.check_client()
def __getstate__(self):
state = dict(self.__dict__)
del state['client']
return state
def __setstate__(self, state):
self.__dict__.update(state)
with PartiallyMutable.unlock():
self.client = es.Elasticsearch([{'host': self.es_host,
'port': self.es_port}])
def create(self, categories):
with PartiallyMutable.unlock():
self.start_time = dt.datetime.utcnow()
self.categories = categories
self.n_categories = len(categories)
workflows = []
for category in self.categories:
workflows += self.categories[category]
workflows = set(workflows)
self.n_workflows = len(workflows)
self.check_client()
try:
if self.client.indices.exists(self.prefix + '*'):
logger.info("Elasticsearch indices with prefix " +
self.prefix + " already exist")
self.delete_elasticsearch()
with open(os.path.join(self.template_dir, 'mapping') +
'.json', 'r') as f:
mapping = json.load(f)
logger.info("creating Elasticsearch indices")
index_suffixes = ['_lobster_tasks', '_lobster_task_logs',
'_lobster_stats', '_lobster_summaries',
'_monitor_data']
for suffix in index_suffixes:
self.client.indices.create(
index=self.prefix + suffix, body=mapping)
# try removing mapping from ES
time.sleep(5)
except Exception as e:
logger.error(e)
self.update_kibana()
logger.info("beginning ELK monitoring")
def end(self):
logger.info("ending ELK monitoring")
with PartiallyMutable.unlock():
self.end_time = dt.datetime.utcnow()
self.update_links()
def resume(self):
logger.info("resuming ELK monitoring")
with PartiallyMutable.unlock():
self.end_time = None
self.update_links()
def cleanup(self):
self.delete_kibana()
self.delete_elasticsearch()
def check_client(self):
logger.info("checking Elasticsearch client")
try:
logger.info("cluster health: " + self.client.cat.health())
except es.exceptions.ElasticsearchException as e:
logger.error(e)
raise AttributeError("could not connect to Elasticsearch cluster" +
" {0}:{1}".format(self.es_host, self.es_port))
def update_client(self):
with PartiallyMutable.unlock():
self.client = es.Elasticsearch([{'host': self.es_host,
'port': self.es_port}])
def init_histogram_intervals(self):
logger.info("initializing dynamic histogram intervals")
try:
with open(os.path.join(self.template_dir, 'intervals') +
'.json', 'r') as f:
intervals = json.load(f)
vis_id_paths = [path for path in nested_paths(intervals)
if path.endswith('.vis_ids')]
for path in vis_id_paths:
vis_ids = [vis_id.replace('[template]', self.prefix)
for vis_id in nested_get(intervals, path)]
nested_set(intervals, path, vis_ids)
search = es_dsl.Search(
using=self.client, index=self.prefix + '_monitor_data') \
.filter('match', _type='fields') \
.filter('match', _id='intervals') \
.extra(size=1)
response = search.execute()
if len(response) > 0:
old_intervals = response[0].to_dict()
fields = ['.'.join(path.split('.')[:-1])
for path in nested_paths(old_intervals)
if path.endswith('interval')]
for field in fields:
nested_set(intervals, nested_get(old_intervals, field))
self.client.index(index=self.prefix + '_monitor_data',
doc_type='fields', id='intervals',
body=intervals)
except Exception as e:
logger.error(e)
def download_templates(self):
logger.info("getting Kibana objects with prefix " + self.prefix)
logger.info("getting index patterns")
index_dir = os.path.join(self.template_dir, 'index')
try:
try:
os.mkdir(self.template_dir)
os.mkdir(os.path.join(self.template_dir, 'index'))
except OSError:
pass
search_index = es_dsl.Search(using=self.client, index='.kibana') \
.filter('prefix', _id=self.prefix) \
.filter('match', _type='index-pattern') \
.extra(size=10000)
response_index = search_index.execute()
for index in response_index:
index.meta.id = index.meta.id \
.replace(self.prefix, '[template]')
index.title = index.title.replace(self.prefix, '[template]')
with open(os.path.join(index_dir, index.meta.id) + '.json',
'w') as f:
f.write(json.dumps(index.to_dict(), indent=4,
sort_keys=True))
f.write('\n')
except Exception as e:
logger.error(e)
dash_dir = os.path.join(self.template_dir, 'dash')
intervals = {}
for name in self.dashboards:
logger.info("getting " + name + " dashboard")
vis_ids = []
try:
try:
os.mkdir(os.path.join(dash_dir))
except OSError:
pass
search_dash = es_dsl.Search(
using=self.client, index='.kibana') \
.filter('match', _id=self.prefix + '-' + name) \
.filter('match', _type='dashboard') \
.extra(size=1)
dash = search_dash.execute()[0]
dash.meta.id = dash.meta.id \
.replace(self.prefix, '[template]')
dash.title = dash.title.replace(self.prefix, '[template]')
dash_panels = json.loads(dash.panelsJSON)
for panel in dash_panels:
vis_ids.append(panel['id'])
panel['id'] = panel['id'].replace(
self.prefix, '[template]')
dash.panelsJSON = json.dumps(dash_panels, sort_keys=True)
with open(os.path.join(dash_dir, dash.meta.id) + '.json',
'w') as f:
f.write(json.dumps(dash.to_dict(), indent=4,
sort_keys=True))
f.write('\n')
except Exception as e:
logger.error(e)
logger.info("getting " + name + " visualizations")
vis_dir = os.path.join(self.template_dir, 'vis')
try:
os.mkdir(vis_dir)
except OSError:
pass
for vis_id in vis_ids:
try:
search_vis = es_dsl.Search(
using=self.client, index='.kibana') \
.filter('match', _id=vis_id) \
.filter('match', _type='visualization') \
.extra(size=1)
vis = search_vis.execute()[0]
vis.meta.id = vis.meta.id \
.replace(self.prefix, '[template]')
vis.title = vis.title \
.replace(self.prefix, '[template]')
vis_state = json.loads(vis.visState)
vis_state['title'] = vis['title']
if vis_state['type'] == 'markdown':
vis_state['params']['markdown'] = "text goes here"
else:
vis_source = json.loads(
vis.kibanaSavedObjectMeta.searchSourceJSON)
vis_source['index'] = vis_source['index'].replace(
self.prefix, '[template]')
if vis_state['type'] == 'histogram':
hist_aggs = [agg for agg in vis_state['aggs']
if agg['type'] == 'histogram']
for agg in hist_aggs:
agg['params']['interval'] = 1e10
field_path = agg['params']['field']
filter_words = vis_source['query']['query_string']['query'].split(' ')
filter_found = False
for i, word in enumerate(filter_words):
if word.startswith(field_path + ':>='):
filter_words[i] = \
field_path + ':>=0'
filter_found = True
elif word.startswith(field_path + ':<='):
filter_words[i] = \
field_path + ':<=0'
filter_found = True
if not filter_found:
if len(filter_words) > 0:
filter_words += \
['AND', field_path + ':>=0',
'AND', field_path + ':<=0']
else:
filter_words = \
[field_path + ':>=0',
'AND', field_path + ':<=0']
vis_source['query']['query_string']['query'] =\
' '.join(filter_words)
vis_ids = nested_get(
intervals, field_path + '.vis_ids')
if vis_ids and vis.meta.id not in vis_ids:
vis_ids.append(vis.meta.id)
else:
vis_ids = [vis.meta.id]
hist_data = {
'interval': None,
'min': None,
'max': None,
'vis_ids': vis_ids
}
nested_set(intervals, agg['params']['field'],
hist_data)
elif vis_state['type'] == 'table':
if vis.meta.id == '[template]-Category-summary':
vis_state['params']['perPage'] = 0
aggs = [agg for agg in vis_state['aggs']
if 'params' in agg and
'size' in agg['params']]
for agg in aggs:
agg['params']['size'] = 0
elif vis.meta.id == '[template]-Workflow-summary':
vis_state['params']['perPage'] = 0
aggs = [agg for agg in vis_state['aggs']
if 'params' in agg and
'size' in agg['params']]
for agg in aggs:
agg['params']['size'] = 0
vis.kibanaSavedObjectMeta.searchSourceJSON = \
json.dumps(vis_source, sort_keys=True)
vis.visState = json.dumps(vis_state, sort_keys=True)
with open(os.path.join(vis_dir, vis.meta.id) +
'.json', 'w') as f:
f.write(json.dumps(vis.to_dict(), indent=4,
sort_keys=True))
f.write('\n')
except Exception as e:
logger.error(e)
try:
with open(os.path.join(self.template_dir, 'intervals') +
'.json', 'w') as f:
f.write(json.dumps(intervals, indent=4, sort_keys=True))
except Exception as e:
logger.error(e)
def update_kibana(self):
logger.info("generating Kibana objects from templates")
logger.debug("generating index patterns")
try:
index_dir = os.path.join(self.template_dir, 'index')
for index_path in os.listdir(index_dir):
with open(os.path.join(index_dir, index_path)) as f:
index = json.load(f)
index['title'] = index['title'] \
.replace('[template]', self.prefix)
index_id = index_path.replace('[template]', self.prefix) \
.replace('.json', '')
self.client.index(index='.kibana', doc_type='index-pattern',
id=index_id, body=index)
except Exception as e:
logger.error(e)
for name in self.dashboards:
logger.debug("generating " + name + " dashboard")
vis_paths = []
try:
dash_dir = os.path.join(self.template_dir, 'dash')
dash_path = '[template]-{}.json'.format(name)
with open(os.path.join(dash_dir, dash_path)) as f:
dash = json.load(f)
dash['title'] = dash['title'] \
.replace('[template]', self.prefix)
dash_panels = json.loads(dash['panelsJSON'])
for panel in dash_panels:
vis_paths.append(panel['id'] + '.json')
panel['id'] = panel['id'] \
.replace('[template]', self.prefix)
dash['panelsJSON'] = json.dumps(dash_panels, sort_keys=True)
dash_id = dash_path.replace('[template]', self.prefix)[:-5]
self.client.index(index='.kibana', doc_type='dashboard',
id=dash_id, body=dash)
except Exception as e:
logger.error(e)
logger.debug("generating " + name + " visualizations")
vis_dir = os.path.join(self.template_dir, 'vis')
for vis_path in vis_paths:
try:
with open(os.path.join(vis_dir, vis_path)) as f:
vis = json.load(f)
vis['title'] = vis['title'] \
.replace('[template]', self.prefix)
vis_id = \
vis_path.replace('[template]', self.prefix)[:-5]
vis_state = json.loads(vis['visState'])
if not vis_state['type'] == 'markdown':
source = json.loads(
vis['kibanaSavedObjectMeta']['searchSourceJSON'])
source['index'] = source['index'] \
.replace('[template]', self.prefix)
vis['kibanaSavedObjectMeta']['searchSourceJSON'] = \
json.dumps(source, sort_keys=True)
if vis_state['type'] == 'table':
if vis_id == '[template]-Category-summary':
vis_state['params']['perPage'] = \
self.n_categories
aggs = [agg for agg in vis_state['aggs']
if 'params' in agg and
'size' in agg['params']]
for agg in aggs:
agg['params']['size'] = self.n_categories
elif vis_id == '[template]-Workflow-summary':
vis_state['params']['perPage'] = \
self.n_workflows
aggs = [agg for agg in vis_state['aggs']
if 'params' in agg and
'size' in agg['params']]
for agg in aggs:
agg['params']['size'] = self.n_workflows
vis['visState'] = json.dumps(vis_state)
self.client.index(index='.kibana',
doc_type='visualization',
id=vis_id, body=vis)
except Exception as e:
logger.error(e)
self.update_links()
self.init_histogram_intervals()
def update_links(self):
logger.debug("generating dashboard links")
dash_links = {}
try:
link_prefix = "http://{0}:{1}/app/" \
.format(self.kib_host, self.kib_port)
if self.end_time:
time_filter = requests.utils.quote(
("_g=(refreshInterval:" +
"(display:Off,pause:!f,section:0,value:0),time:" +
"(from:'{0}Z',mode:absolute,to:'{1}Z'))")
.format(self.start_time, self.end_time),
safe='/:!?,&=#')
else:
time_filter = requests.utils.quote(
("_g=(refreshInterval:" +
"(display:'{0} seconds',pause:!f,section:2," +
"value:{1}),time:(from:'{2}Z',mode:absolute,to:now))")
.format(self.refresh_interval,
int(self.refresh_interval * 1e3),
self.start_time),
safe='/:!?,&=#')
dash_dir = os.path.join(self.template_dir, 'dash')
for name in self.dashboards:
dash_path = '[template]-{}.json'.format(name)
with open(os.path.join(dash_dir, dash_path)) as f:
dash = json.load(f)
dash_id = dash_path.replace('[template]', self.prefix)[:-5]
dash['title'] = dash['title'] \
.replace('[template]', self.prefix)
link = requests.utils.quote(
"kibana#/dashboard/{0}".format(dash_id),
safe='/:!?,&=#')
logger.info("Kibana {0} dashboard at {1}{2}?{3}"
.format(name, link_prefix, link, time_filter))
dash_links[name] = link
except Exception as e:
logger.error(e)
logger.debug("generating shared link widget")
try:
shared_links_text = "####Dashboards\n" \
.format(self.user, self.project)
for name in dash_links:
shared_links_text += "- [{0}]({1})\n" \
.format(name, dash_links[name])
task_log_link = requests.utils.quote(
("kibana#/discover?_a=(columns:" +
"!(Task.id,TaskUpdate.exit_code,Task.log),index:" +
"{0}_lobster_tasks,interval:auto,query:(query_string:" +
"(analyze_wildcard:!t,query:'!!TaskUpdate.exit_code:0'" +
")),sort:!(_score,desc))")
.format(self.prefix),
safe='/:!?,&=#')
shared_links_text += "\n####[Failed task logs]({0})\n" \
.format(task_log_link)
with open(os.path.join(self.template_dir, 'vis',
'[template]-Links') + '.json',
'r') as f:
shared_links_vis = json.load(f)
shared_links_vis['title'] = shared_links_vis['title'].replace(
'[template]', self.prefix)
shared_links_state = json.loads(shared_links_vis['visState'])
shared_links_state['params']['markdown'] = shared_links_text
shared_links_vis['visState'] = json.dumps(shared_links_state,
sort_keys=True)
self.client.index(index='.kibana', doc_type='visualization',
id=self.prefix + "-Links",
body=shared_links_vis)
except Exception as e:
logger.error(e)
for name in dash_links:
logger.debug("generating " + name + " link widget")
try:
links_text = "####Category filters\n".format(name)
all_filter = requests.utils.quote(
"&_a=(query:(query_string:(analyze_wildcard:!t,query:" +
"'_missing_:category OR category:all')))",
safe='/:!?,&=#')
links_text += "- [all]({0}?{1})\n" \
.format(dash_links[name], all_filter)
merge_filter = requests.utils.quote(
"&_a=(query:(query_string:(analyze_wildcard:!t,query:" +
"'_missing_:Task.category OR Task.category:merge')))",
safe='/:!?,&=#')
links_text += "- [merge]({0}?{1})\n" \
.format(dash_links[name], merge_filter)
for category in self.categories:
cat_filter = requests.utils.quote(
("_a=(query:(query_string:(analyze_wildcard:!t," +
"query:'(_missing_:Task.category AND " +
"_missing_:TaskUpdate AND _missing_:category) " +
"OR category:{0} OR Task.category:{0}')))")
.format(category), safe='/:!?,&=#')
links_text += "- [{0}]({1}?{2})\n" \
.format(category, dash_links[name], cat_filter)
links_text += "\n####[Reset time range]({0}?{1})\n" \
.format(dash_links[name], time_filter)
with open(os.path.join(self.template_dir, 'vis',
'[template]-{0}-links'
.format(name)) + '.json',
'r') as f:
links_vis = json.load(f)
links_vis['title'] = links_vis['title'].replace(
'[template]', self.prefix)
links_state = json.loads(links_vis['visState'])
links_state['params']['markdown'] = links_text
links_vis['visState'] = json.dumps(links_state,
sort_keys=True)
self.client.index(index='.kibana', doc_type='visualization',
id='{0}-{1}-links'
.format(self.prefix, name),
body=links_vis)
except Exception as e:
logger.error(e)
def delete_kibana(self):
logger.info("deleting Kibana objects with prefix " + self.prefix)
try:
search = es_dsl.Search(using=self.client, index='.kibana') \
.filter('prefix', _id=self.prefix) \
.extra(size=10000)
response = search.execute()
for result in response:
self.client.delete(index='.kibana',
doc_type=result.meta.doc_type,
id=result.meta.id)
except Exception as e:
logger.error(e)
def delete_elasticsearch(self):
logger.info("deleting Elasticsearch indices with prefix " +
self.prefix)
try:
self.client.indices.delete(index=self.prefix + '*')
except es.exceptions.ElasticsearchException as e:
logger.error(e)
def unroll_cumulative_fields(self, log, previous, fields):
logger.debug("unrolling cumulative fields")
for field in fields:
try:
cur_val = nested_get(log, field)
old_val = nested_get(previous, field)
if isinstance(cur_val, dt.date):
cur_val = int(cur_val.strftime('%s'))
old_val = int(
dt.datetime.strptime(old_val, '%Y-%m-%dT%H:%M:%S')
.strftime('%s'))
if old_val is not None:
if '.' in field:
field_parts = field.split('.')
new_field = '.'.join(field_parts[:-1] +
[field_parts[-1] + '_diff'])
else:
new_field = field + '_diff'
new_val = max(cur_val - old_val, 0)
nested_set(log, new_field, new_val)
except Exception as e:
logger.error(e)
return log
def update_histogram_bins(self, log, log_type):
logger.debug("updating " + log_type + " histogram bins")
try:
search = es_dsl.Search(
using=self.client, index=self.prefix + '_monitor_data') \
.filter('match', _type='fields') \
.filter('match', _id='intervals') \
.extra(size=1)
intervals = search.execute()[0].to_dict()
fields = ['.'.join(path.split('.')[:-1])
for path in nested_paths(intervals[log_type])
if path.endswith('interval')]
for field in fields:
cur_val = nested_get(log, field)
if cur_val is None:
break
field_path = log_type + '.' + field
intervals_field = nested_get(intervals, field_path)
changed = False
if intervals_field['interval'] is None:
intervals_field['min'] = cur_val
intervals_field['max'] = cur_val
changed = True
else:
if cur_val < intervals_field['min']:
intervals_field['min'] = cur_val
changed = True
elif cur_val > intervals_field['max']:
intervals_field['max'] = cur_val
changed = True
if changed:
if intervals_field['min'] == intervals_field['max']:
intervals_field['interval'] = 1
else:
intervals_field['interval'] = \
math.ceil((intervals_field['max'] -
intervals_field['min']) / 20.0)
for vis_id in intervals_field['vis_ids']:
search_vis = es_dsl.Search(
using=self.client, index='.kibana') \
.filter('match', _id=vis_id) \
.filter('match', _type='visualization') \
.extra(size=1)
vis = search_vis.execute()[0]
vis_state = json.loads(vis.visState)
for agg in vis_state['aggs']:
if agg['type'] == 'histogram' and \
agg['params']['field'] == field_path:
agg['params']['interval'] = \
intervals_field['interval']
vis.visState = json.dumps(vis_state, sort_keys=True)
vis_source = json.loads(
vis.kibanaSavedObjectMeta.searchSourceJSON)
filter_words = vis_source['query']['query_string']['query'].split(' ')
for i, word in enumerate(filter_words):
if word.startswith(field_path + ':>='):
filter_words[i] = field_path + ':>=' + \
str(intervals_field['min'])
elif word.startswith(field_path + ':<='):
filter_words[i] = field_path + ':<=' + \
str(intervals_field['max'])
vis_source['query']['query_string']['query'] =\
' '.join(filter_words)
vis.kibanaSavedObjectMeta.searchSourceJSON = \
json.dumps(vis_source, sort_keys=True)
self.client.index(index='.kibana',
doc_type='visualization',
id=vis_id, body=vis.to_dict())
nested_set(intervals, log_type + '.' + field, intervals_field)
self.client.index(index=self.prefix + '_monitor_data',
doc_type='fields', id='intervals',
body=intervals)
except Exception as e:
logger.error(e)
def index_task(self, task):
logger.debug("parsing Task object")
try:
task = dictify(task, skip=('_task'))
task['resources_requested'] = dictify(
task['resources_requested'], skip=('this'))
task['resources_measured'] = dictify(
task['resources_measured'], skip=('this', 'peak_times'))
task['resources_allocated'] = dictify(
task['resources_allocated'], skip=('this'))
task['resources_measured']['cpu_wall_ratio'] = \
task['resources_measured']['cpu_time'] / \
float(task['resources_measured']['wall_time'])
except Exception as e:
logger.error(e)
return
logger.debug("parsing Task timestamps")
try:
timestamp_keys = ['send_input_start',
'send_input_finish',
'execute_cmd_start',
'execute_cmd_finish',
'receive_output_start',
'receive_output_finish',
'submit_time',
'finish_time',
'resources_measured.start',
'resources_measured.end']
for key in timestamp_keys:
nested_set(task, key, dt.datetime.utcfromtimestamp(
float(str(nested_get(task, key))[:10])))
except Exception as e:
logger.error(e)
logger.debug("parsing Task log")
try:
task_log = task.pop('output')
task['log'] = requests.utils.quote(
("kibana#/doc/{0}_lobster_tasks/" +
"{0}_lobster_task_logs/log?id={1}")
.format(self.prefix, task['id']),
safe='/:!?,&=#')
e_p = re.compile(
r"(Begin Fatal Exception[\s\S]*End Fatal Exception)")
e_match = e_p.search(task_log)
if e_match:
logger.debug("parsing fatal exception")
task['fatal_exception'] = \
{'message': e_match.group(1).replace('>> cmd: ', '')}
e_cat_p = re.compile(r"'(.*)'")
task['fatal_exception']['category'] = \
e_cat_p.search(task['fatal_exception']['message']).group(1)
except Exception as e:
logger.error(e)
logger.debug("sending Task documents to Elasticsearch")
try:
task_doc = {'doc': {'Task': task},
'doc_as_upsert': True}
self.client.update(index=self.prefix + '_lobster_tasks',
doc_type='task', id=task['id'],
body=task_doc)
log_doc = {'text': task_log}
self.client.index(index=self.prefix + '_lobster_task_logs',
doc_type='log', id=task['id'],
body=log_doc)
except Exception as e:
logger.error(e)
def index_task_update(self, task_update):
logger.debug("parsing TaskUpdate object")
try:
task_update = dict(task_update.__dict__)
task_update['megabytes_output'] = \
task_update['bytes_output'] / 1024.0 ** 2
task_update['allocated_disk_MB'] = \
task_update['allocated_disk'] / 1024.0
task_update['allocated_memory_MB'] = \
task_update['allocated_memory'] / 1024.0
if not task_update['time_on_worker'] == 0:
task_update['bandwidth'] = \
task_update['network_bytes_received'] / 1e6 / \
task_update['time_on_worker']
if not (task_update['cores'] == 0 or
task_update['time_processing_end'] == 0 or
task_update['time_prologue_end'] == 0):
task_update['percent_efficiency'] = \
task_update['time_cpu'] * 100 / \
(1. * task_update['cores'] *
(task_update['time_processing_end'] -
task_update['time_prologue_end']))
status_code_map = {
0: 'initialized',
1: 'assigned',
2: 'successful',
3: 'failed',
4: 'aborted',
6: 'published',
7: 'merging',
8: 'merged'
}
task_update['status_text'] = status_code_map[task_update['status']]
cache_map = {0: 'cold cache', 1: 'hot cache', 2: 'dedicated'}
task_update['cache_text'] = cache_map[task_update['cache']]
except Exception as e:
logger.error(e)
return
logger.debug("calculating TaskUpdate time intervals")
try:
task_update['runtime'] = \
max(task_update['time_processing_end'] -
task_update['time_wrapper_start'], 0)
task_update['time_input_transfer'] = \
max(task_update['time_transfer_in_start'] -
task_update['time_transfer_in_end'], 0)
task_update['time_startup'] = \
max(task_update['time_wrapper_start'] -
task_update['time_transfer_in_end'], 0)
task_update['time_release_setup'] = \
max(task_update['time_wrapper_ready'] -
task_update['time_wrapper_start'], 0)
task_update['time_stage_in'] = \
max(task_update['time_stage_in_end'] -
task_update['time_wrapper_ready'], 0)
task_update['time_prologue'] = \
max(task_update['time_prologue_end'] -
task_update['time_stage_in_end'], 0)
task_update['time_overhead'] = \
max(task_update['time_wrapper_ready'] -
task_update['time_wrapper_start'], 0)
task_update['time_executable'] = \
max(task_update['time_processing_end'] -
task_update['time_prologue_end'], 0)
task_update['time_epilogue'] = \
max(task_update['time_epilogue_end'] -
task_update['time_processing_end'], 0)
task_update['time_stage_out'] = \
max(task_update['time_stage_out_end'] -
task_update['time_epilogue_end'], 0)
task_update['time_output_transfer_wait'] = \
max(task_update['time_transfer_out_start'] -
task_update['time_stage_out_end'], 0)
task_update['time_output_transfer_work_queue'] = \
max(task_update['time_transfer_out_end'] -
task_update['time_transfer_out_start'], 0)
task_update['time_total_eviction'] = \
max(task_update['time_total_on_worker'] -
task_update['time_on_worker'], 0)
if task_update['exit_code'] == 0:
task_update['time_total_overhead'] = \
max(task_update['time_prologue_end'] -
task_update['time_transfer_in_start'], 0)
task_update['time_total_processing'] = \
max(task_update['time_processing_end'] -
task_update['time_prologue_end'], 0)
task_update['time_total_stage_out'] = \
max(task_update['time_transfer_out_end'] -
task_update['time_processing_end'], 0)
else:
task_update['time_total_failed'] = \
task_update['time_total_on_worker']
except Exception as e:
logger.error(e)
logger.debug("parsing TaskUpdate timestamps")
try:
timestamp_keys = ['time_processing_end',
'time_prologue_end',
'time_retrieved',
'time_stage_in_end',
'time_stage_out_end',
'time_transfer_in_end',
'time_transfer_in_start',
'time_transfer_out_end',
'time_transfer_out_start',
'time_wrapper_ready',
'time_wrapper_start',
'time_epilogue_end']
for key in timestamp_keys:
task_update[key] = dt.datetime.utcfromtimestamp(
float(str(task_update[key])[:10]))
except Exception as e:
logger.error(e)
self.update_histogram_bins(task_update, 'TaskUpdate')
logger.debug("sending TaskUpdate document to Elasticsearch")
try:
doc = {'doc': {'TaskUpdate': task_update,
'timestamp': task_update['time_retrieved']},
'doc_as_upsert': True}
self.client.update(index=self.prefix + '_lobster_tasks',
doc_type='task', id=task_update['id'],
body=doc)
except Exception as e:
logger.error(e)
def index_stats(self, now, left, times, log_attributes, stats, category):
logger.debug("parsing lobster stats log")
try:
keys = ['timestamp', 'units_left'] + \
['total_{}_time'.format(k) for k in sorted(times.keys())] + \
log_attributes + ['category']
values = \
[dt.datetime.utcfromtimestamp(int(now.strftime('%s'))), left] + \
[times[k] for k in sorted(times.keys())] + \
[getattr(stats, a) for a in log_attributes] + [category]
stats = dict(zip(keys, values))
stats['committed_memory_GB'] = stats['committed_memory'] / 1024.0
stats['total_memory_GB'] = stats['total_memory'] / 1024.0
stats['committed_disk_GB'] = stats['committed_disk'] / 1024.0
stats['total_disk_GB'] = stats['total_disk'] / 1024.0
stats['start_time'] = dt.datetime.utcfromtimestamp(
float(str(stats['start_time'])[:10]))
stats['time_when_started'] = dt.datetime.utcfromtimestamp(
float(str(stats['time_when_started'])[:10]))
except Exception as e:
logger.error(e)
return
try:
search_previous = es_dsl.Search(
using=self.client, index=self.prefix + '_lobster_stats') \
.filter('match', category=category) \
.sort('-timestamp').extra(size=1)
response_previous = search_previous.execute()
if len(response_previous) > 0:
previous = response_previous[0].to_dict()
fields = ['timestamp', 'workers_lost', 'workers_able',
'workers_connected', 'workers_idled_out',
'workers_busy', 'workers_fast_aborted',
'workers_blacklisted', 'workers_joined',
'workers_idle', 'workers_released', 'workers_ready',
'workers_removed', 'workers_full', 'workers_init']
stats = self.unroll_cumulative_fields(stats, previous, fields)
except Exception as e:
logger.error(e)
try:
if 'timestamp_diff' in stats:
stats['time_other_lobster'] = \
max(stats['timestamp_diff'] -
stats['total_status_time'] -
stats['total_create_time'] -
stats['total_action_time'] -
stats['total_update_time'] -
stats['total_fetch_time'] -
stats['total_return_time'], 0)
stats['time_other_wq'] = \
max(stats['timestamp_diff'] - stats['time_send'] -
stats['time_receive'] - stats['time_status_msgs'] -
stats['time_internal'] - stats['time_polling'] -
stats['time_application'], 0)
stats['time_idle'] = \
stats['timestamp_diff'] * stats['idle_percentage']
except Exception as e:
logger.error(e)
return
logger.debug("sending lobster stats document to Elasticsearch")
try:
self.client.index(index=self.prefix + '_lobster_stats',
doc_type='log', body=stats,
id=str(int(int(now.strftime('%s')) * 1e6 +
now.microsecond)))
except Exception as e:
logger.error(e)
def index_summary(self, summary):
logger.debug("updating summary documents")
keys = ['label', 'events', 'events_read', 'events_written', 'units',
'units_unmasked', 'units_written', 'units_merged',
'units_stuck', 'units_failed', 'units_skipped', 'units_left',
'percent_progress', 'percent_merged']
workflow_summaries = {}
for item in list(summary)[1:]:
workflow_summary = dict(zip(keys, item))
if workflow_summary['percent_progress']:
workflow_summary['percent_progress'] = \
float(workflow_summary['percent_progress'].replace('%', ''))
if workflow_summary['percent_merged']:
workflow_summary['percent_merged'] = \
float(workflow_summary['percent_merged'].replace('%', ''))
workflow_summaries[workflow_summary['label']] = workflow_summary
workflow_doc = {'doc': workflow_summary,
'doc_as_upsert': True}
if workflow_summary['label'] == 'Total':
self.client.update(
index=self.prefix + '_lobster_summaries',
doc_type='total', body=workflow_doc,
id=workflow_summary['label'])
else:
self.client.update(
index=self.prefix + '_lobster_summaries',
doc_type='workflow', body=workflow_doc,
id=workflow_summary['label'])
for category in self.categories:
category_summary = {key: 0 for key in keys[1:-2]}
for workflow in self.categories[category]:
for item in category_summary:
category_summary[item] += \
workflow_summaries[workflow][item]
category_summary['label'] = category
category_summary['percent_progress'] = round(
100.0 * category_summary['units_written'] /
category_summary['units_unmasked'], 1)
category_summary['percent_merged'] = round(
100.0 * category_summary['units_merged'] /
category_summary['units_written'], 1) \
if category_summary['units_written'] > 0 else 0
category_doc = {'doc': category_summary,
'doc_as_upsert': True}
self.client.update(
index=self.prefix + '_lobster_summaries',
doc_type='category', body=category_doc,
id=category)