import datetime
import getpass
import os
import pickle
from lobster.core.workflow import Category
from lobster.util import Configurable
class Items(object):
"""
Collection similar to `namedtuple`, but can be pickled.
Parameters
----------
args : list
List of objects the collection should contain
key : function
Function to obtain the key of an object in the collection
"""
def __init__(self, args, key=None):
for arg in args:
attr = key(arg) if key else arg
if attr in self.__dict__:
raise AttributeError("Attribute already defined: {}".format(attr))
setattr(self, attr, arg)
self.__sequence = args
def __iter__(self):
return iter(self.__sequence)
def __len__(self):
return len(self.__sequence)
def __getitem__(self, n):
return self.__sequence[n]
def __repr__(self):
if len(self.__sequence) == 0:
return '[]'
def indent(text):
lines = text.splitlines()
if len(lines) <= 1:
return text
return "\n".join(" " + l for l in lines).strip()
return '[\n {}\n]'.format(',\n '.join(indent(repr(e)) for e in self.__sequence))
[docs]class Config(Configurable):
"""
Top-level Lobster configuration object
This configuration object will fully specify a Lobster project,
including several :class:`~lobster.core.workflow.Workflow` instances
and a :class:`~lobster.se.StorageConfiguration`.
Parameters
----------
workdir : str
The working directory to be used for the project. Note that
this should be on a local filesystem to avoid problems with the
database.
storage : StorageConfiguration
The configuration for the storage element for output and input
files.
workflows : list
A list of :class:`~lobster.core.workflow.Workflow` to process.
label : str
A string to identify this project by. This will be used in the
CMS dashboard, where it appears as
``lobster_<user>_<label>_<hash>``, and in conjunction with
`WorkQueue`, where the project will be referred to as
``lobster_<user>_<label>``. The default is the date in the
format `YYYYmmdd`.
advanced : AdvancedOptions
More options for advanced users.
plotdir : str
A directory to store monitoring pages in.
foremen_logs : list
A list of :class:`str` pointing to the `WorkQueue` foremen logs.
"""
_mutable = {}
def __init__(self, workdir, storage, workflows, label=None, advanced=None, plotdir=None, foremen_logs=None,
base_directory=None, base_configuration=None, startup_directory=None, elk=None):
"""
Top-level configuration object for Lobster
"""
self.label = '{}_{}'.format(
getpass.getuser(),
label or datetime.datetime.now().strftime('%Y%m%d')
)
self.workdir = os.path.expanduser(os.path.expandvars(workdir))
if plotdir is not None:
self.plotdir = os.path.expanduser(os.path.expandvars(plotdir))
else:
self.plotdir = None
self.foremen_logs = foremen_logs
self.storage = storage
self.workflows = Items(workflows, key=lambda w: w.label)
self.advanced = advanced if advanced else AdvancedOptions()
self.elk = elk
cats = list(set([w.category for w in workflows])) + [Category(name='merge', cores=1)]
self.categories = Items(cats, key=lambda c: c.name)
self.base_directory = base_directory
self.base_configuration = base_configuration
self.startup_directory = startup_directory
self.storage.activate()
def __repr__(self):
s = "from lobster import cmssw\nfrom lobster.core import *\n\n"
for cat in self.categories:
if cat.name == 'merge':
continue
s += "category_{} = {}\n\n".format(cat.name, repr(cat))
for wflow in self.workflows:
s += "workflow_{} = {}\n\n".format(wflow.label, repr(wflow))
override = {'workflows': '[{}]'.format(
', '.join(['workflow_' + w.label for w in self.workflows]))}
s += "config = " + Configurable.__repr__(self, override)
return s
@classmethod
def load(cls, path):
try:
with open(os.path.join(path, 'config.pkl'), 'rb') as f:
cfg = pickle.load(f)
cfg.storage.activate(failures=False)
return cfg
except IOError as e:
print e
raise IOError("can't load configuration from {0}".format(
os.path.join(path, 'config.pkl')))
def save(self):
with open(os.path.join(self.workdir, 'config.pkl'), 'wb') as f:
pickle.dump(self, f)
[docs]class AdvancedOptions(Configurable):
"""
Advanced options for tuning Lobster
Attributes modifiable at runtime:
* `payload`
* `threshold_for_failure`
* `threshold_for_skipping`
Parameters
----------
abort_threshold : int
After how many successful tasks outliers in runtime should be
killed.
abort_multiplier : int
How many standard deviations a task is allowed to go over the
average task runtime.
bad_exit_codes : list
A list of exit codes that are considered to come from bad
workers. As soon as a task returns with an exit code from this
list, the worker it ran on will be blacklisted and no more
tasks send to it.
dashboard : :class:`~lobster.cmssw.Dashboard`
Use the CMS dashboard to report task status. Set or `False` to
disable.
dump_core : bool
Produce core dumps. Useful to debug `WorkQueue`.
email : str
The email address you want to receive emails from Lobster.
full_monitoring : bool
Produce full monitoring output. Useful to debug `WorkQueue`.
log_level : int
How much logging output to show. Goes from 1 to 5, where 1 is
the most verbose (including a lot of debug output), and 5 is
practically quiet.
osg_version : str
The version of OSG you want lobster to run on.
payload : int
How many tasks to keep in the queue (minimum). Note that the
payload will increase with the number of cores available to
Lobster. This is just the minimum with no workers connected.
proxy : :class:`~lobster.cmssw.Proxy`
An authentication mechanism to access data. Set to `False` to
disable.
threshold_for_failure : int
How often a single unit may fail to be processed before Lobster
will not attempt to process it any longer.
threshold_for_skipping : int
How often a single file may fail to be accessed before Lobster
will not attempt to process it any longer.
wq_max_retries : int
How often `WorkQueue` will attempt to process a task before
handing it back to Lobster. `WorkQueue` will only reprocess
evicted tasks automatically.
wq_port : int
WorkQueue Master port number.
Defaults to -1 to look for an available port.
xrootd_servers : list
A list of xrootd servers to use to access remote data.
Defaults to `cmsxrootd.fnal.gov`.
"""
_mutable = {
'bad_exit_codes': (None, [], False),
'payload': (None, [], False),
'threshold_for_failure': ('source.update_stuck', [], False),
'threshold_for_skipping': ('source.update_stuck', [], False),
'xrootd_servers': ('source.copy_siteconf', [], False)
}
def __init__(self,
abort_threshold=10,
abort_multiplier=4,
bad_exit_codes=None,
dashboard=None,
dump_core=False,
email=None,
full_monitoring=False,
log_level=2,
osg_version=None,
payload=10,
proxy=None,
threshold_for_failure=30,
threshold_for_skipping=30,
wq_max_retries=10,
wq_port=-1,
xrootd_servers=None):
from lobster import cmssw
self.osg_version = osg_version
if not osg_version:
osg_location = os.environ.get("OSG_LOCATION")
if not osg_location:
raise AttributeError("No OSG version specified or in the environment.")
self.osg_version = osg_location.rsplit('/', 3)[1]
self.abort_threshold = abort_threshold
self.abort_multiplier = abort_multiplier
self.bad_exit_codes = bad_exit_codes if bad_exit_codes else [169]
self.dashboard = dashboard
if dashboard is None:
self.dashboard = cmssw.Dashboard()
elif not dashboard:
self.dashboard = cmssw.Monitor()
self.dump_core = dump_core
self.email = email
self.full_monitoring = full_monitoring
self.log_level = log_level
self.payload = payload
self.proxy = proxy if proxy is not None else cmssw.Proxy()
self.threshold_for_failure = threshold_for_failure
self.threshold_for_skipping = threshold_for_skipping
self.wq_max_retries = wq_max_retries
self.wq_port = wq_port
self.xrootd_servers = xrootd_servers if xrootd_servers else ['cmsxrootd.fnal.gov']