Source code for pipewelder.cli

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
The Pipewelder command-line interface.
"""

from __future__ import print_function

import argparse
import os
import sys
import boto.datapipeline

from glob import glob

from pipewelder import metadata, util, Pipewelder

import logging
logging.basicConfig(level="INFO")


CONFIG_DEFAULTS = {
    "dirs": ["*"],
    "region": "",
    "template": "pipeline_definition.json",
    "values": [],
}


[docs]def main(argv): """Program entry point. :param argv: command-line arguments :type argv: :class:`list` """ author_strings = [] for name, email in zip(metadata.authors, metadata.emails): author_strings.append('Author: {0} <{1}>'.format(name, email)) epilog = ''' Pipewelder {version} {authors} URL: <{url}> '''.format( project=metadata.project, version=metadata.version, authors='\n'.join(author_strings), url=metadata.url) parser = argparse.ArgumentParser( prog=argv[0], formatter_class=argparse.RawDescriptionHelpFormatter, description=metadata.description, epilog=epilog) parser.add_argument( '-V', '--version', action='version', version='{0} {1}'.format(metadata.project, metadata.version)) parser.add_argument( 'action', help="""Action to take: 'validate' pipeline definitions with AWS; 'put-definition' of pipelines to AWS; 'upload' pipeline files to myInputS3Dir; 'activate' defined pipelines (also puts definitions if needed); 'delete' pipelines from AWS """) parser.add_argument( '--group', default=None, help="Group within pipewelder.json to act on; defaults to all") args = parser.parse_args(args=argv[1:]) args.action = args.action.replace('-', '_') defaults = {} if 'AWS_ACCESS_KEY_ID' not in os.environ: parser.error("Must set AWS_ACCESS_KEY_ID") if 'AWS_SECRET_ACCESS_KEY' not in os.environ: parser.error("Must set AWS_SECRET_ACCESS_KEY") if 'AWS_DEFAULT_REGION' in os.environ: defaults['region'] = os.environ['AWS_DEFAULT_REGION'] config_path = (os.path.exists('pipewelder.json') and 'pipewelder.json' or None) configs = pipewelder_configs(config_path, defaults) print("Reading configuration from {0}".format(config_path)) for name, config in configs.items(): if args.group and args.group != name: continue if name == 'defaults': continue print("Acting on configuration '{0}'".format(name)) conn = boto.datapipeline.connect_to_region(config['region']) pw = build_pipewelder(conn, config) if not execute_pipewelder_action(pw, args.action): return 1 return 0
[docs]def entry_point(): """ Zero-argument entry point for use with setuptools/distribute. """ raise SystemExit(main(sys.argv))
[docs]def build_pipewelder(conn, config): """ Return a Pipewelder object defined by *config*. """ try: pw = Pipewelder(conn, config['template']) except IOError as e: print(e) return 1 for d in config['dirs']: p = pw.add_pipeline(d) for k, v in config["values"].items(): p.values[k] = v return pw
def execute_pipewelder_action(pw, action): return_value = call_method(pw, action) if not return_value: print("Failed '{0}' action" .format(action)) return return_value
[docs]def pipewelder_configs(filename=None, defaults=None): """ Parse json from *filename* for Pipewelder object configurations. Returns a dict which maps config names to dicts of options. """ if filename is None: data = {"pipewelder": {}} dirname = os.path.abspath('.') else: dirname = os.path.dirname(os.path.abspath(filename)) data = util.load_json(filename) defaults = defaults or {} data_defaults = data.get('defaults', {}) defaults = dict(list(CONFIG_DEFAULTS.items()) + list(data_defaults.items()) + list(defaults.items())) outputs = {} for name in data: if name == 'defaults': continue this_config = dict(list(defaults.items()) + list(data[name].items())) dirs = [] with util.cd(dirname): for entry in this_config['dirs']: for item in glob(entry): if os.path.exists(os.path.join(item, 'values.json')): dirs.append(item) outputs[name] = { "name": name, "dirs": dirs, "region": this_config['region'], "template": this_config['template'], "values": this_config['values'], } return outputs
[docs]def call_method(obj, name): """ Call the method *name* on *obj*. """ return getattr(obj, name)()
if __name__ == '__main__': entry_point()