Configuration

Example configurations

All configurations in this section are also available in the examples directory of the Lobster source.

Simple dataset processing

The following is an example of a simple python configuration used to process a single dataset:

import datetime

from lobster import cmssw
from lobster.core import AdvancedOptions, Category, Config, StorageConfiguration, Workflow

version = datetime.datetime.now().strftime('%Y%m%d_%H%M')

storage = StorageConfiguration(
    output=[
        "hdfs://eddie.crc.nd.edu:19000/store/user/$USER/lobster_test_" + version,
        "file:///hadoop/store/user/$USER/lobster_test_" + version,
        # ND is not in the XrootD redirector, thus hardcode server.
        # Note the double-slash after the hostname!
        "root://deepthought.crc.nd.edu//store/user/$USER/lobster_test_" + version,
        "chirp://eddie.crc.nd.edu:9094/store/user/$USER/lobster_test_" + version,
        "gsiftp://T3_US_NotreDame/store/user/$USER/lobster_test_" + version,
        "srm://T3_US_NotreDame/store/user/$USER/lobster_test_" + version
    ]
)

processing = Category(
    name='processing',
    cores=1,
    runtime=900,
    memory=1000
)

workflows = []

ttH = Workflow(
    label='ttH',
    dataset=cmssw.Dataset(
        dataset='/ttHToNonbb_M125_13TeV_powheg_pythia8/RunIIFall15MiniAODv2-PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1/MINIAODSIM',
        events_per_task=50000
    ),
    category=processing,
    command='cmsRun simple_pset.py',
    publish_label='test',
    merge_size='3.5G',
    outputs=['output.root']
)

workflows.append(ttH)

config = Config(
    workdir='/tmpscratch/users/$USER/lobster_test_' + version,
    plotdir='~/www/lobster/test_' + version,
    storage=storage,
    workflows=workflows,
    advanced=AdvancedOptions(
        bad_exit_codes=[127, 160],
        log_level=1
    )
)

And using a custom ROOT macro over the same dataset:

import datetime

from lobster import cmssw
from lobster.core import AdvancedOptions, Category, Config, StorageConfiguration, Workflow

version = datetime.datetime.now().strftime('%Y%m%d_%H%M')

storage = StorageConfiguration(
    output=[
        "hdfs://eddie.crc.nd.edu:19000/store/user/$USER/lobster_test_" + version,
        "file:///hadoop/store/user/$USER/lobster_test_" + version,
        # ND is not in the XrootD redirector, thus hardcode server.
        # Note the double-slash after the hostname!
        "root://deepthought.crc.nd.edu//store/user/$USER/lobster_test_" + version,
        "chirp://eddie.crc.nd.edu:9094/store/user/$USER/lobster_test_" + version,
        "gsiftp://T3_US_NotreDame/store/user/$USER/lobster_test_" + version,
        "srm://T3_US_NotreDame/store/user/$USER/lobster_test_" + version
    ]
)

processing = Category(
    name='processing',
    cores=1,
    runtime=900,
    memory=1000
)

workflows = []

ttH = Workflow(
    label='ttH',
    dataset=cmssw.Dataset(
        dataset='/ttHToNonbb_M125_13TeV_powheg_pythia8/RunIIFall15MiniAODv2-PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1/MINIAODSIM',
        lumis_per_task=20,
        file_based=True
    ),
    category=processing,
    command='root -b -q -l script_macro.C @outputfiles @inputfiles',
    extra_inputs=['script_macro.C'],
    publish_label='test',
    merge_command='hadd @outputfiles @inputfiles',
    merge_size='3.5G',
    outputs=['output.root']
)

workflows.append(ttH)

config = Config(
    workdir='/tmpscratch/users/$USER/lobster_test_' + version,
    plotdir='~/www/lobster/test_' + version,
    storage=storage,
    workflows=workflows,
    advanced=AdvancedOptions(
        bad_exit_codes=[127, 160],
        log_level=1
    )
)

MC generation à la production

Lobster has the ability to reproduce the production workflows used in CMS for Monte-Carlo production. As a first step, the steps of a workflow have to be downloaded and the release areas prepared:

#!/bin/sh

set -e

mkdir -p mc_gen
cd mc_gen

source /cvmfs/cms.cern.ch/cmsset_default.sh

curl -k https://cms-pdmv.cern.ch/mcm/public/restapi/requests/get_setup/HIG-RunIIWinter15wmLHE-00196 > setup01_lhe.sh
curl -k https://cms-pdmv.cern.ch/mcm/public/restapi/requests/get_setup/HIG-RunIISummer15GS-00177 > setup02_gs.sh
curl -k https://cms-pdmv.cern.ch/mcm/public/restapi/requests/get_setup/HIG-RunIIFall15DR76-00243 > setup03_dr.sh
curl -k https://cms-pdmv.cern.ch/mcm/public/restapi/requests/get_setup/HIG-RunIIFall15MiniAODv2-00224 > setup04_v4.sh

sed -i 's@/afs/.*@/cvmfs/cms.cern.ch/cmsset_default.sh@g' setup*.sh
sed -i 's@export X509.*@@' setup*.sh

for f in *.sh; do
	sh $f;
done

cat <<EOF >> HIG-RunIIWinter15wmLHE-00196_1_cfg.py
process.maxEvents.input = cms.untracked.int32(1)
process.externalLHEProducer.nEvents = cms.untracked.uint32(1)
EOF

cat <<EOF >> HIG-RunIISummer15GS-00177_1_cfg.py
process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring('file:HIG-RunIIWinter15wmLHE-00196.root'))
EOF

cat <<EOF >> HIG-RunIIFall15DR76-00243_1_cfg.py
process.source.fileNames = cms.untracked.vstring('file:HIG-RunIISummer15GS-00177.root')
EOF

cat <<EOF >> HIG-RunIIFall15MiniAODv2-00224_1_cfg.py
process.source.fileNames = cms.untracked.vstring('file:HIG-RunIIFall15DR76-00243.root')
EOF

if [ -n "$RUN_MC" ]; then
	cd CMSSW_7_1_16_patch1/; cmsenv; cd -
	cmsRun -n 4 HIG-RunIIWinter15wmLHE-00196_1_cfg.py

	cd CMSSW_7_1_18/; cmsenv; cd -
	cmsRun -n 4 HIG-RunIISummer15GS-00177_1_cfg.py

	cd CMSSW_7_6_1/; cmsenv; cd -
	cmsRun -n 4 HIG-RunIIFall15DR76-00243_1_cfg.py
	cmsRun -n 4 HIG-RunIIFall15DR76-00243_2_cfg.py

	cd CMSSW_7_6_3/; cmsenv; cd -
	cmsRun -n 4 HIG-RunIIFall15MiniAODv2-00224_1_cfg.py
fi

The setup created by this script can be run by the following Lobster configuration, which sports a workflow for each step of the official MC production chain:

import datetime

from lobster import cmssw
from lobster.core import AdvancedOptions, Category, Config, StorageConfiguration, Workflow
from lobster.core import ParentDataset, ProductionDataset

version = datetime.datetime.now().strftime('%Y%m%d')

storage = StorageConfiguration(
    output=[
        "hdfs://eddie.crc.nd.edu:19000/store/user/$USER/lobster_mc_" + version,
        "file:///hadoop/store/user/$USER/lobster_mc_" + version,
        "root://deepthought.crc.nd.edu//store/user/$USER/lobster_mc_" + version,
        "chirp://eddie.crc.nd.edu:9094/store/user/$USER/lobster_test_" + version,
        "gsiftp://T3_US_NotreDame/store/user/$USER/lobster_mc_" + version,
        "srm://T3_US_NotreDame/store/user/$USER/lobster_mc_" + version,
    ]
)

workflows = []

lhe = Workflow(
    label='lhe_step',
    pset='mc_gen/HIG-RunIIWinter15wmLHE-00196_1_cfg.py',
    sandbox=cmssw.Sandbox(release='mc_gen/CMSSW_7_1_16_patch1'),
    merge_size='125M',
    dataset=ProductionDataset(
        total_events=25000,
        events_per_lumi=25,
        lumis_per_task=10
    ),
    category=Category(
        name='lhe',
        cores=1,
        memory=2000
    )
)

gs = Workflow(
    label='gs_step',
    pset='mc_gen/HIG-RunIISummer15GS-00177_1_cfg.py',
    sandbox=cmssw.Sandbox(release='mc_gen/CMSSW_7_1_18'),
    merge_size='500M',
    dataset=ParentDataset(
        parent=lhe,
        units_per_task=1
    ),
    category=Category(
        name='gs',
        cores=1,
        memory=2000,
        runtime=45 * 60
    )
)

digi = Workflow(
    label='digi_step',
    pset='mc_gen/HIG-RunIIFall15DR76-00243_1_cfg.py',
    sandbox=cmssw.Sandbox(release='mc_gen/CMSSW_7_6_1'),
    merge_size='1G',
    dataset=ParentDataset(
        parent=gs,
        units_per_task=10
    ),
    category=Category(
        name='digi',
        cores=1,
        memory=2600,
        runtime=45 * 60,
        tasks_max=100
    )
)

reco = Workflow(
    label='reco_step',
    pset='mc_gen/HIG-RunIIFall15DR76-00243_2_cfg.py',
    sandbox=cmssw.Sandbox(release='mc_gen/CMSSW_7_6_1'),
    # Explicitly specify outputs, since the dependency processing only
    # works for workflows with one output file, but the configuration
    # includes two.
    outputs=['HIG-RunIIFall15DR76-00243.root'],
    merge_size='1G',
    dataset=ParentDataset(
        parent=digi,
        units_per_task=6
    ),
    category=Category(
        name='reco',
        cores=4,
        memory=2800,
        runtime=45 * 60,
        tasks_min=10
    )
)

maod = Workflow(
    label='mAOD_step',
    pset='mc_gen/HIG-RunIIFall15MiniAODv2-00224_1_cfg.py',
    sandbox=cmssw.Sandbox(release='mc_gen/CMSSW_7_6_3'),
    merge_size='500M',
    dataset=ParentDataset(
        parent=reco,
        units_per_task=60
    ),
    category=Category(
        name='mAOD',
        cores=2,
        memory=2000,
        runtime=30 * 60
    )
)

config = Config(
    workdir='/tmpscratch/users/$USER/lobster_mc_' + version,
    plotdir='~/www/lobster/mc_' + version,
    storage=storage,
    workflows=[lhe, gs, digi, reco, maod],
    advanced=AdvancedOptions(log_level=1)
)

The configuration components

General Options

class lobster.core.config.Config(workdir, storage, workflows, label=None, advanced=None, plotdir=None, foremen_logs=None, base_directory=None, base_configuration=None, startup_directory=None, elk=None)[source]

Top-level Lobster configuration object

This configuration object will fully specify a Lobster project, including several Workflow instances and a StorageConfiguration.

Parameters:
  • workdir (str) – The working directory to be used for the project. Note that this should be on a local filesystem to avoid problems with the database.
  • storage (StorageConfiguration) – The configuration for the storage element for output and input files.
  • workflows (list) – A list of Workflow to process.
  • label (str) – A string to identify this project by. This will be used in the CMS dashboard, where it appears as lobster_<user>_<label>_<hash>, and in conjunction with WorkQueue, where the project will be referred to as lobster_<user>_<label>. The default is the date in the format YYYYmmdd.
  • advanced (AdvancedOptions) – More options for advanced users.
  • plotdir (str) – A directory to store monitoring pages in.
  • foremen_logs (list) – A list of str pointing to the WorkQueue foremen logs.
class lobster.core.config.AdvancedOptions(abort_threshold=10, abort_multiplier=4, bad_exit_codes=None, dashboard=None, dump_core=False, email=None, full_monitoring=False, log_level=2, osg_version=None, payload=10, proxy=None, threshold_for_failure=30, threshold_for_skipping=30, wq_max_retries=10, wq_port=-1, xrootd_servers=None)[source]

Advanced options for tuning Lobster

Attributes modifiable at runtime:

  • payload
  • threshold_for_failure
  • threshold_for_skipping
Parameters:
  • abort_threshold (int) – After how many successful tasks outliers in runtime should be killed.
  • abort_multiplier (int) – How many standard deviations a task is allowed to go over the average task runtime.
  • bad_exit_codes (list) – A list of exit codes that are considered to come from bad workers. As soon as a task returns with an exit code from this list, the worker it ran on will be blacklisted and no more tasks send to it.
  • dashboard (Dashboard) – Use the CMS dashboard to report task status. Set or False to disable.
  • dump_core (bool) – Produce core dumps. Useful to debug WorkQueue.
  • email (str) – The email address you want to receive emails from Lobster.
  • full_monitoring (bool) – Produce full monitoring output. Useful to debug WorkQueue.
  • log_level (int) – How much logging output to show. Goes from 1 to 5, where 1 is the most verbose (including a lot of debug output), and 5 is practically quiet.
  • osg_version (str) – The version of OSG you want lobster to run on.
  • payload (int) – How many tasks to keep in the queue (minimum). Note that the payload will increase with the number of cores available to Lobster. This is just the minimum with no workers connected.
  • proxy (Proxy) – An authentication mechanism to access data. Set to False to disable.
  • threshold_for_failure (int) – How often a single unit may fail to be processed before Lobster will not attempt to process it any longer.
  • threshold_for_skipping (int) – How often a single file may fail to be accessed before Lobster will not attempt to process it any longer.
  • wq_max_retries (int) – How often WorkQueue will attempt to process a task before handing it back to Lobster. WorkQueue will only reprocess evicted tasks automatically.
  • wq_port (int) – WorkQueue Master port number. Defaults to -1 to look for an available port.
  • xrootd_servers (list) – A list of xrootd servers to use to access remote data. Defaults to cmsxrootd.fnal.gov.
class lobster.se.StorageConfiguration(output, input=None, use_work_queue_for_inputs=False, use_work_queue_for_outputs=False, shuffle_inputs=False, shuffle_outputs=False, disable_input_streaming=False, disable_stage_in_acceleration=False)[source]

Container for storage element configuration.

Uses URLs of the form {protocol}://{server}/{path} to specify input and output locations, where the server is omitted for file and hadoop access. All output URLs should point to the same physical storage, to ensure consistent data handling. Storage elements within CMS, as in T2_CH_CERN will be expanded for the srm and root protocol. Protocols supported:

  • file
  • gsiftp
  • hadoop
  • chirp
  • srm
  • root

The chirp protocol requires an instance of a Chirp server.

Attributes modifiable at runtime:

  • input
  • output
Parameters:
  • output (list) – A list of URLs to access output storage
  • input (list) – A list of URLs to access input data
  • use_work_queue_for_inputs (bool) – Use WorkQueue to transfer input data. Will encur a severe running penalty when used.
  • use_work_queue_for_outputs (bool) – Use WorkQueue to transfer output data. Will encur a severe running penalty when used.
  • shuffle_inputs (bool) – Shuffle the list of input URLs when passing them to the task. Will provide basic load-distribution.
  • shuffle_outputs (bool) – Shuffle the list of output URLs when passing them to the task. Will provide basic load-distribution.
  • disable_input_streaming (bool) – Turn of streaming input data via, e.g., XrootD.
  • disable_stage_in_acceleration (bool) – By default, tasks with many input files will test input URLs for the first successful one, which will then be used to access the remaining input files. By using this setting, all input URLs will be attempted for all input files.

Workflow specification

class lobster.core.workflow.Category(name, mode='max_throughput', cores=None, memory=None, disk=None, runtime=None, tasks_max=None, tasks_min=None)[source]

Resource specification for one or more Workflow.

This information will be passed on to WorkQueue, which will forcibly terminate tasks of Workflow in the group that exceed the specified resources.

Attributs modifiable at runtime:

  • tasks_min
  • tasks_max
  • runtime
Parameters:
  • name (str) – The name of the resource group.
  • mode (str) – Dictates how WorkQueue handles exhausted resources. Possible values are: fixed (task fails), max (the maximum allowed resource consumption is set by the maximum seen in tasks of that category; tasks are automatically adjusted and retried), min_waste (same as max, but allocations prioritize minimizing waste), or max_throughput (same as max, but allocations prioritize maximizing throughput.)
  • cores (int) – The max number of cores required (fixed mode), or the first guess for WorkQueue to determine the number of cores required (all other modes).
  • memory (int) – How much memory a task is allowed to use, in megabytes (fixed mode), or the starting guess for WorkQueue to determine how much memory a task requires (all other modes).
  • disk (int) – How much disk a task is allowed to use, in megabytes (fixed mode), or the starting guess for WorkQueue to determine how much disk a task requires (all other modes.)
  • runtime (int) – The runtime of the task in seconds. Lobster will add a grace period to this time, and try to adjust the task size such that this runtime is achieved.
  • tasks_max (int) – How many tasks should be in the queue (running or waiting) at the same time.
  • tasks_min (int) – The minimum of how many tasks should be in the queue (waiting) at the same time.
class lobster.core.workflow.Workflow(label, dataset, command, category=Category( default, mode=<Mock id='139883738238800'> ), publish_label=None, cleanup_input=False, merge_size=-1, sandbox=None, unique_arguments=None, extra_inputs=None, outputs=None, output_format='{base}_{id}.{ext}', local=False, globaltag=None, merge_command='cmsRun')[source]

A specification for processing a dataset.

Parameters:
  • label (str) – The shorthand name of the workflow. This is used as a reference throughout Lobster.
  • dataset (Dataset) – The specification of data to be processed. Can be any of the dataset related classes.
  • category (Category) – The category of resource specification this workflow belongs to.
  • publish_label (str) – The label to be used for the publication database.
  • cleanup_input (bool) – Delete input files after processing.
  • merge_size (str) – Activates output file merging when set. Accepts the suffixes k, m, g for kilobyte, megabyte, …
  • sandbox (Sandbox or list of Sandbox) – The sandbox(es) to use. Currently can be a Sandbox. When multiple sandboxes are used, one sandbox per computing architecture to be run on is expected, containing the same release, and an ValueError will be raised otherwise.
  • command (str) –

    The command to run when executing the workflow.

    The command string may contain @args, @outputfiles, and @inputfiles, which will be replaced by unique arguments and output as well as input files, respectively. For running CMSSW workflows, it is sufficient to use:

    cmsRun pset.py
    

    where the file pset.py will be automatically added to the sandbox and the input source of the parameter set will be modified to use the correct input files. Note that otherwise, any used files will have to be included in extra_inputs.

  • extra_inputs (list) – Additional inputs outside the sandbox needed to process the workflow.
  • unique_arguments (list) – A list of arguments. Each element of the dataset is processed once for each argument in this list. The unique argument is also passed to the executable.
  • outputs (list) – A list of strings which specifies the files produced by the workflow. If outputs=[], no output files will be returned. If outputs=None, outputs will be automatically determined for CMSSW workflows.
  • output_format (str) – How the output files should be renamed on the storage element. This is a new-style format string, allowing for the fields base, id, and ext, for the basename of the output file, the ID of the task, and the extension of the output file.
  • local (bool) – If set to True, Lobster will assume this workflow’s input is present on the output storage element.
  • globaltag (str) – Which GlobalTag this workflow uses. Needed for publication of CMSSW workflows, and can be automatically determined for these.
  • merge_command (str) –

    Accepts cmsRun (the default), or a custom command. Tells Lobster what command to use for merging. If outputs are autodetermined (outputs=None), cmsRun will be used for EDM output and hadd will be used otherwise.

    When merging plain ROOT files the following should be used:

    merge_command="hadd @outputfiles @inputfiles"
    

    See the specification for the command parameter about passing input and output file values.

class lobster.core.sandbox.Sandbox(recycle=None, blacklist=None)[source]
Parameters:
  • recycle (str) – A path to an existing sandbox to re-use.
  • blacklist (list) – A specification of paths to not pack into the sandbox.

Dataset specification

class lobster.core.dataset.Dataset(files, files_per_task=1, patterns=None)[source]

A simple dataset specification.

Runs over files found in a list of directories or specified directly.

Parameters:
  • files (list) – A list of files or directories to process. May also be a str pointing to a single file or directory.
  • files_per_task (int) – How many files to process in one task. Defaults to 1.
  • patterns (list) – A list of shell-style file patterns to match filenames against. Defaults to None and will use all files considered.
class lobster.core.dataset.EmptyDataset(number_of_tasks=1)[source]

Dataset specification for non-cmsRun workflows with no input files.

Parameters:number_of_tasks (int) – How many tasks to run.
class lobster.core.dataset.ParentDataset(parent, units_per_task=1)[source]

Process the output of another workflow.

Parameters:
  • parent (Workflow) – The parent workflow to process.
  • units_per_task (int) – How much of the parent dataset to process at once. Can be changed by Lobster to match the user-specified task runtime.
class lobster.core.dataset.ProductionDataset(total_events, events_per_lumi=500, lumis_per_task=1, randomize_seeds=True)[source]

Dataset specification for Monte-Carlo event generation.

Parameters:
  • total_events (int) – How many events to generate.
  • events_per_lumi (int) – How many events to generate in one luminosity section.
  • lumis_per_task (int) – How many lumis to produce per task. Can be changed by Lobster to match the user-specified task runtime.
  • randomize_seeds (bool) – Use random seeds every time a task is run.
class lobster.core.dataset.MultiProductionDataset(gridpacks, events_per_gridpack, events_per_lumi=500, lumis_per_task=1, randomize_seeds=True)[source]

Dataset specification for Monte-Carlo event generation from a set of gridpacks.

Parameters:
  • gridpacks (list) – A list of gridpack files or directories to process. May also be a str pointing to a single gridpack file or directory.
  • events_per_gridpack (int) – How many events to generate per gridpack.
  • events_per_lumi (int) – How many events to generate in one luminosity section.
  • lumis_per_task (int) – How many lumis to produce per task. Can be changed by Lobster to match the user-specified task runtime.
  • randomize_seeds (bool) – Use random seeds every time a task is run.

Monitoring

class lobster.monitor.elk.ElkInterface(es_host, es_port, kib_host, kib_port, project, dashboards=None, refresh_interval=300)[source]

Enables ELK stack monitoring for the current Lobster run using an existing Elasticsearch cluster.

Attributs modifiable at runtime: * es_host * es_port * kib_host * kib_port * dashboards * refresh_interval

Parameters:
  • es_host (str) – Host running Elasticsearch cluster.
  • es_port (int) – Port number running Elasticsearch HTTP service.
  • kib_host (str) – Host running Kibana instance connected to Elasticsearch cluster.
  • kib_port (int) – Port number running Kibana HTTP service.
  • user (str) – User ID to label Elasticsearch indices and Kibana objects.
  • project (str) – Project ID to label Elasticsearch indices and Kibana objects.
  • dashboards (list) – List of dashboards to include from the Kibana templates. Defaults to including only the core dashboard. Available dashboards: Core, Advanced, Tasks.
  • refresh_interval (int) – Refresh interval for Kibana dashboards, in seconds. Defaults to 300 seconds = 5 minutes.

Note

At Notre Dame, Elasticsearch is accessible at elk.crc.nd.edu:9200 and Kibana is accessible at elk.crc.nd.edu:5601