import logging
import os
import random
import re
if 'LOBSTER_SKIP_HADOOP' not in os.environ:
import snakebite.client
import snakebite.errors
import subprocess
import xml.dom.minidom
from contextlib import contextmanager
from lobster.util import Configurable
import Chirp as chirp
logger = logging.getLogger('lobster.se')
# Breaks a URL down into 3 parts: the protocol, a optional server, and
# the path
url_re = re.compile(r'^([a-z]+)://([^/]*)(.*)/?$')
class FileSystem(object):
"""Singleton class as an interface for filesystem interactions.
Needs to be configured before first use, with two lists of
`StorageElement` implementations. See the documentation of
``configure()`` for details.
"""
_defaults = []
_alternatives = []
def __init__(self):
self.__file__ = __file__
self.__name__ = 'fs'
def __getattr__(self, attr):
if attr in self.__dict__:
return self.__dict__[attr]
def switch(*args, **kwargs):
logger.debug("resolving file system method '{0}' with arguments {1!r}, {2!r}".format(attr, args, kwargs))
lasterror = None
for imp in FileSystem._defaults:
try:
return imp.fixresult(getattr(imp, attr)(*map(imp.lfn2pfn, args), **kwargs))
except imp.errors as e:
logger.debug(
"method {0} of {1} failed with {2}, using args {3}, {4}".format(attr, imp, e, args, kwargs))
lasterror = e
except TypeError as e:
logger.error("binding received an unexpected type; method {0} of {1} failed with {2}, using "
"args {3}, {4}".format(attr, imp, e, args, kwargs))
lasterror = e
raise AttributeError(
"no resolution found for method '{0}' with arguments '{1}': {2}".format(attr, args, lasterror))
return switch
def lfn2pfn(self, lfn, instance):
for imp in FileSystem._defaults:
if isinstance(imp, instance):
return imp.lfn2pfn(lfn)
@classmethod
def configure(cls, defaults, alternatives):
"""Configure the filesystem access methods.
Parameters
----------
defaults : list
List of :class:`StorageElement` implementations. These
methods will be used in order by default to perform file
system interactions.
alternatives : list
As `defaults`, specifies methods to perform file system
interactions. These methods are only active within the
context of ``fs.alternative()``.
"""
cls._defaults = defaults
cls._alternatives = alternatives
@contextmanager
def alternative(self):
tmp = FileSystem._defaults
FileSystem._defaults = FileSystem._alternatives
try:
yield
finally:
FileSystem._defaults = tmp
class StorageElement(object):
"""Storage Element base class.
Provides some basic handling of relative paths. To be subclassed by
implementations.
"""
def __init__(self, pfnprefix):
"""Baseclass of a storage element.
Parameters
----------
pfnprefix : string
The path prefix under which relative file names can be
accessed.
"""
self._pfnprefix = pfnprefix
if not self._pfnprefix.endswith('/'):
self._pfnprefix += '/'
@property
def errors(self):
return (IOError, OSError)
def lfn2pfn(self, path):
if path.startswith('/'):
p = os.path.join(self._pfnprefix, path[1:])
else:
p = os.path.join(self._pfnprefix, path)
m = url_re.match(p)
if m:
protocol, server, path = url_re.match(p).groups()
path = os.path.normpath(path)
return "{0}://{1}{2}/".format(protocol, server, path)
return os.path.normpath(p)
def fixresult(self, res):
def pfn2lfn(p):
return p.replace(self._pfnprefix, '', 1)
if isinstance(res, basestring):
return pfn2lfn(res)
try:
return map(pfn2lfn, res)
except TypeError:
return res
def makedirs(self, path):
if re.match(r'^..(?:/..)*$', path):
if len(path) > 2 + 100 * 3:
# fail for excessive path recursion
raise NotImplementedError
parent = os.path.join(path, '..')
elif path == '':
parent = '..'
else:
parent = os.path.dirname(path)
if not self.exists(parent):
self.makedirs(parent)
if self.exists(path):
return
mode = self.permissions(parent)
self.mkdir(path, mode=mode)
class Local(StorageElement):
def __init__(self, pfnprefix=''):
super(Local, self).__init__(pfnprefix)
self.exists = os.path.exists
self.getsize = os.path.getsize
self.isdir = self._guard(os.path.isdir)
self.isfile = self._guard(os.path.isfile)
def _guard(self, method):
"""Protect method against non-existent paths.
"""
def guarded(path):
if not os.path.exists(path):
raise IOError("path does not exist: {0}".format(path))
return method(path)
return guarded
def ls(self, path):
for fn in os.listdir(path):
yield os.path.join(path, fn)
def mkdir(self, path, mode=None):
os.mkdir(path)
if mode:
os.chmod(path, mode)
def permissions(self, path):
return os.stat(path).st_mode & 0777
def remove(self, *paths):
for path in paths:
try:
os.remove(path)
except OSError:
pass
class Hadoop(StorageElement):
def __init__(self, host, port, pfnprefix='/hadoop'):
super(Hadoop, self).__init__(pfnprefix)
self.__c = snakebite.client.Client(host, int(port))
@property
def errors(self):
return (Exception,)
def exists(self, path):
try:
self.__c.stat([path])
return True
except snakebite.errors.FileNotFoundException:
return False
def getsize(self, path):
return self.__c.stat([path])['blocksize']
def isdir(self, path):
return self.__c.stat([path])['file_type'] == 'd'
def isfile(self, path):
return self.__c.stat([path])['file_type'] == 'f'
def ls(self, path):
for data in self.__c.ls([path]):
yield data['path']
def mkdir(self, path, mode):
for data in self.__c.mkdir([path], mode=mode):
pass
def permissions(self, path):
return self.__c.stat([path])['permission']
def remove(self, *paths):
"""Remove paths.
First try passing the entire list of paths to be removed. This
approach is almost instantaneous, but fails if any of the paths
do not exist. In that case, we try again, one by one. This takes
tens of milliseconds per path, but ensures that any paths that
do exist are removed.
"""
try:
for data in self.__c.delete(list(paths)):
pass
except snakebite.errors.FileNotFoundException:
for path in paths:
try:
self.__c.delete([path]).next()
except snakebite.errors.FileNotFoundException:
pass
class Chirp(StorageElement):
def __init__(self, server, pfnprefix):
super(Chirp, self).__init__(pfnprefix)
self.__c = chirp.Client(server, timeout=10)
def exists(self, path):
try:
self.__c.stat(str(path))
return True
except IOError:
return False
def getsize(self, path):
return self.__c.stat(str(path)).size
def isdir(self, path):
return len(self.__c.ls(str(path))) > 0
def isfile(self, path):
return len(self.__c.ls(str(path))) == 0
def ls(self, path):
for f in self.__c.ls(str(path)):
if f.path not in ('.', '..'):
yield os.path.join(path, f.path)
def mkdir(self, path, mode=None):
self.__c.mkdir(str(path))
if mode:
self.__c.chmod(str(path), mode)
def permissions(self, path):
return self.__c.stat(str(path)).mode & 0777
def remove(self, *paths):
for path in paths:
self.__c.rm(str(path))
class SRM(StorageElement):
def __init__(self, pfnprefix):
super(SRM, self).__init__(pfnprefix)
def execute(self, cmd, *paths, **kwargs):
cmds = cmd.split()
args = ['gfal-' + cmds[0]] + cmds[1:] + list(paths)
try:
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={})
pout, err = p.communicate()
if p.returncode != 0 and not kwargs.get('safe', False):
msg = "Failed to execute '{0}':\n{1}\n{2}".format(' '.join(args), err, pout)
raise IOError(msg)
except OSError:
raise AttributeError("srm utilities not available")
return pout
def exists(self, path):
try:
self.execute('stat', path)
return True
except Exception:
return False
def getsize(self, path):
output = self.execute('stat', path)
return output.splitlines()[1].split()[1]
def isdir(self, path):
try:
output = self.execute('stat', path)
return 'directory' in output.splitlines()[1]
except Exception:
return False
def isfile(self, path):
try:
output = self.execute('stat', path)
return 'regular file' in output.splitlines()[1]
except Exception:
return False
def ls(self, path):
for p in self.execute('ls', path).splitlines():
yield os.path.join(path, p)
def mkdir(self, path, mode=None):
self.execute('mkdir -p', path)
def permissions(self, path):
output = self.execute('stat', path)
try:
return int(output.splitlines()[2][9:13], 8)
except IndexError:
raise IOError
def remove(self, *paths):
while len(paths) != 0:
# FIXME safe is active because SRM does not care about directories.
self.execute('rm -r', *(paths[:50]), safe=True)
paths = paths[50:]
class XrootD(StorageElement):
def __init__(self, pfnprefix):
super(XrootD, self).__init__(pfnprefix)
def execute(self, cmd, *paths, **kwargs):
cmds = cmd.split()
output = []
for path in paths:
# Break the path into server and directory
protocol, server, path = url_re.match(path).groups()
args = ['xrdfs', server] + cmds + [path]
try:
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={})
pout, err = p.communicate()
if p.returncode != 0 and not kwargs.get('safe', False):
msg = "Failed to execute '{0}':\n{1}\n{2}".format(' '.join(args), err, pout)
raise IOError(msg)
output.append(pout)
except OSError:
raise AttributeError("xrd utilities not available")
return '/n'.join(output)
def exists(self, path):
try:
self.execute('stat', path)
return True
except Exception:
return False
def getsize(self, path):
output = self.execute('stat', path)
for line in output.splitlines():
field, value = line.split(':')
if field == 'Size':
return value.strip()
# Shouldn't get here unless we don't find the size. Raise an exception
msg = 'xrdfs stat did not return file size. Command output:\n{}'.format(output)
raise AttributeError(msg)
def isdir(self, path):
try:
output = self.execute('stat', path)
for line in output.splitlines():
field, value = line.split(':', 1)
if field == 'Flags':
# Do some silly stuff to get the flags...
flags = value.split()[-1].strip('()').split('|')
return 'IsDir' in flags
# If we got here, never found any mention of flags, just say false.
return False
except Exception:
return False
def isfile(self, path):
try:
output = self.execute('stat', path)
for line in output.splitlines():
field, value = line.split(':', 1)
if field == 'Flags':
# Do some silly stuff to get the flags...
flags = value.split()[-1].strip('()').split('|')
return 'IsDir' not in flags
# If we got here, never found any mention of flags, just say false.
return False
except Exception:
return False
def ls(self, path):
protocol, server, _ = url_re.match(path).groups()
for p in self.execute('ls', path).splitlines():
# We need to add the protocol back so that `lfn2pfn` above
# can recognize and remove just the pfnprefix.
yield "{0}://{1}{2}".format(protocol, server, p)
def mkdir(self, path, mode=None):
self.execute('mkdir -p', path)
def remove(self, *paths):
# It doesn't seem like Xrdfs supports either recursive or batch removal, so let's do it one at a time.
for path in paths:
if self.isdir(path):
for dirpath in self.ls(path):
self.remove(dirpath) # Recursive because the directory might contain directories
self.execute('rmdir', path)
else:
self.execute('rm', path)
[docs]class StorageConfiguration(Configurable):
"""
Container for storage element configuration.
Uses URLs of the form `{protocol}://{server}/{path}` to specify input
and output locations, where the `server` is omitted for `file` and
`hadoop` access. All output URLs should point to the same physical
storage, to ensure consistent data handling. Storage elements within
CMS, as in `T2_CH_CERN` will be expanded for the `srm` and `root`
protocol. Protocols supported:
* `file`
* `gsiftp`
* `hadoop`
* `chirp`
* `srm`
* `root`
The `chirp` protocol requires an instance of a `Chirp` server.
Attributes modifiable at runtime:
* `input`
* `output`
Parameters
----------
output : list
A list of URLs to access output storage
input : list
A list of URLs to access input data
use_work_queue_for_inputs : bool
Use `WorkQueue` to transfer input data. Will encur a severe
running penalty when used.
use_work_queue_for_outputs : bool
Use `WorkQueue` to transfer output data. Will encur a severe
running penalty when used.
shuffle_inputs : bool
Shuffle the list of input URLs when passing them to the task.
Will provide basic load-distribution.
shuffle_outputs : bool
Shuffle the list of output URLs when passing them to the task.
Will provide basic load-distribution.
disable_input_streaming : bool
Turn of streaming input data via, e.g., `XrootD`.
disable_stage_in_acceleration : bool
By default, tasks with many input files will test input URLs
for the first successful one, which will then be used to access
the remaining input files. By using this setting, all input
URLs will be attempted for all input files.
"""
_mutable = {
'input': ('config.storage.activate', [], False),
'output': ('config.storage.activate', [], False)
}
# Map protocol shorthands to actual protocol names
__protocols = {
'gsiftp': 'gsiftp',
'srm': 'srmv2',
'root': 'xrootd'
}
# Matches CMS tiered computing site as found in
# /cvmfs/cms.cern.ch/SITECONF/
__site_re = re.compile(r'^T[0123]_(?:[A-Z]{2}_)?[A-Za-z0-9_\-]+$')
def __init__(self,
output,
input=None,
use_work_queue_for_inputs=False,
use_work_queue_for_outputs=False,
shuffle_inputs=False,
shuffle_outputs=False,
disable_input_streaming=False,
disable_stage_in_acceleration=False):
if input is None:
self.input = []
else:
self.input = [
self.expand_site(os.path.expanduser(os.path.expandvars(i))) for i in input]
self.output = [self.expand_site(os.path.expanduser(os.path.expandvars(o))) for o in output]
self.use_work_queue_for_inputs = use_work_queue_for_inputs
self.use_work_queue_for_outputs = use_work_queue_for_outputs
self.shuffle_inputs = shuffle_inputs
self.shuffle_outputs = shuffle_outputs
self.disable_input_streaming = disable_input_streaming
self.disable_stage_in_acceleration = disable_stage_in_acceleration
logger.debug("using input location {0}".format(self.input))
logger.debug("using output location {0}".format(self.output))
def _find_match(self, protocol, site, path):
"""Extracts the LFN to PFN translation from the SITECONF.
>>> StorageConfiguration({})._find_match('xrootd', 'T3_US_NotreDame', '/store/user/spam/ham/eggs')
(u'/+store/(.*)', u'root://xrootd.unl.edu//store/\\\\1')
"""
file = os.path.join('/cvmfs/cms.cern.ch/SITECONF', site, 'PhEDEx/storage.xml')
doc = xml.dom.minidom.parse(file)
for e in doc.getElementsByTagName("lfn-to-pfn"):
if e.attributes["protocol"].value != protocol:
continue
if 'destination-match' in e.attributes.keys() and \
not re.match(e.attributes['destination-match'].value, site):
continue
if path and len(path) > 0 and \
'path-match' in e.attributes.keys() and \
re.match(e.attributes['path-match'].value, path) is None:
continue
return e.attributes["path-match"].value, e.attributes["result"].value.replace('$1', r'\1')
raise AttributeError(
"No match found for protocol {0} at site {1}, using {2}".format(protocol, site, path))
def expand_site(self, url):
"""Expands a CMS site label in a url to the corresponding server.
>>> StorageConfiguration({}).expand_site('root://T3_US_NotreDame/store/user/spam/ham/eggs')
u'root://xrootd.unl.edu//store/user/spam/ham/eggs'
"""
protocol, server, path = url_re.match(url).groups()
if self.__site_re.match(server) and protocol in self.__protocols:
regexp, result = self._find_match(self.__protocols[protocol], server, path)
return re.sub(regexp, result, path)
if path.endswith('/'):
path = path[:-1]
return "{0}://{1}{2}/".format(protocol, server, path)
def transfer_inputs(self):
"""Indicates whether input files need to be transferred manually.
"""
return self.use_work_queue_for_inputs
def transfer_outputs(self):
"""Indicates whether output files need to be transferred manually.
"""
return self.use_work_queue_for_outputs
def local(self, filename):
for url in self.input + self.output:
protocol, server, path = url_re.match(url).groups()
if protocol != 'file':
continue
fn = os.path.join(path, filename)
if os.path.isfile(fn):
return fn
raise IOError("Can't create LFN without local storage access")
def _initialize(self, methods, failures):
for url in methods:
protocol, server, path = url_re.match(url).groups()
if protocol == 'chirp':
try:
yield Chirp(server, path)
except chirp.AuthenticationFailure:
if failures:
raise AttributeError("cannot access chirp server")
elif protocol == 'file':
yield Local(path)
elif protocol == 'hdfs':
host, port = server.split(':')
yield Hadoop(host, port, path)
elif protocol == 'srm':
yield SRM(url)
elif protocol == 'root':
yield XrootD(url)
else:
logger.debug("implementation of master access missing for URL {0}".format(url))
def activate(self, failures=True):
"""Sets file system access methods.
Replaces default file system access methods with the ones specified
per configuration for input and output storage element access.
"""
FileSystem.configure(
list(self._initialize(self.output, failures)),
list(self._initialize(self.input, failures))
)
def preprocess(self, parameters, merge):
"""Adjust the storage transfer parameters sent with a task.
Parameters
----------
parameters : dict
The task parameters to alter. This method will add keys
'input', 'output', and 'disable streaming'.
merge : bool
Specify if this is a merging parameter set.
"""
if self.shuffle_inputs:
random.shuffle(self.input)
if self.shuffle_outputs or (self.shuffle_inputs and merge):
random.shuffle(self.output)
parameters['input'] = self.input if not merge else self.output
parameters['output'] = self.output
parameters['disable streaming'] = self.disable_input_streaming
if not self.disable_stage_in_acceleration:
parameters['accelerate stage-in'] = 3