Pipewelder Core API

The core Pipewelder API.

class pipewelder.core.Pipeline(conn, s3_conn, template, dirpath)[source]

A class defining a single pipeline definition and associated tasks.

Create a Pipeline based on definition dict template.

dirpath is a directory containing a ‘values.json’ file, a ‘run’ executable, and a ‘tasks’ directory. conn is a DataPipelineConnection and s3_conn is an S3Connection.

activate()[source]

Activate this pipeline definition in AWS.

Deletes the existing pipeline if it has previously been activated.

Returns True if successful.

api_objects()[source]

Return a dict containing the pipeline objects in AWS API format.

api_parameters()[source]

Return a dict containing the pipeline parameters in AWS API format.

api_tags()[source]

Return a list containing the pipeline tags in AWS API format.

api_values()[source]

Return a dict containing the pipeline param values in AWS API format.

create()[source]

Create a pipeline in AWS if it does not already exist.

Returns the pipeline id.

delete()[source]

Delete this pipeline definition from AWS.

Returns True if successful.

is_valid()[source]

Returns True if the pipeline definition validates to AWS.

put_definition()[source]

Put this pipeline definition to AWS.

Returns True if successful.

upload()[source]

Uploads the contents of dirpath to S3.

The destination path in S3 is determined by ‘myS3InputDirectory’ in the ‘values.json’ file for this pipeline. Existing contents of the ‘tasks’ subdirectory are deleted.

Returns True if successful.

class pipewelder.core.Pipewelder(conn, template_path, s3_conn=None)[source]

A collection of Pipelines sharing a definition template.

conn is a boto.datapipeline.layer1.DataPipelineConnection instance used to manipulate added pipelines, s3_conn is a boto.s3.connection.S3Connection used to upload pipeline tasks to S3, and template_path is the path to a local file containing the template pipeline definition.

activate()[source]

Activate all pipeline definitions, deleting existing pipeline if needed.

Returns True if successful.

add_pipeline(dirpath)[source]

Load a new Pipeline object based on the files contained in dirpath.

are_pipelines_valid()[source]

Returns True if all pipeline definition validate with AWS.

delete()[source]

Delete all pipeline definitions.

Returns True if successful.

put_definition()[source]

Puts definitions for all pipelines.

Returns True if successful.

upload()[source]

Upload files to S3 corresponding to each pipeline and its tasks.

Returns True is successful.

validate()[source]

Synonym for are_pipelines_valid().

pipewelder.core.adjusted_to_future(timestamp, period)[source]

Return timestamp string, adjusted to the future if necessary.

If timestamp is in the future, it will be returned unchanged. If it’s in the past, period will be repeatedly added until the result is in the future.

All times are assumed to be in UTC.

>>> adjusted_to_future('2199-01-01T00:00:00', '1 days')
'2199-01-01T00:00:00'
pipewelder.core.bucket_and_path(s3_uri)[source]

Return a bucket name and key path from s3_uri.

>>> bucket_and_path('s3://pipewelder-bucket/pipewelder-test/inputs')
('pipewelder-bucket', 'pipewelder-test/inputs')
pipewelder.core.definition_from_file(filename)[source]

Return a dict containing the contents of pipeline definition filename.

pipewelder.core.definition_from_id(conn, pipeline_id)[source]

Return a dict containing the definition of pipeline_id.

conn is a DataPipelineConnection object.

pipewelder.core.fetch_default(params, key)[source]

Return the default associated with key from parameter list params.

If no default, returns None. >>> p = [{‘type’: ‘String’, ‘id’: ‘myParam’, ‘default’: ‘foo’}] >>> fetch_default(p, ‘myParam’) ‘foo’ >>> p = [{‘type’: ‘String’, ‘id’: ‘myParam’}] >>> fetch_default(p, ‘myParam’)

pipewelder.core.fetch_field_value(aws_response, field_name)[source]

Return a value nested within the ‘fields’ entry of dict aws_response.

The returned value is the second item from a dict with ‘key’ field_name.

>>> r = {'fields': [{'key': 'someKey', 'stringValue': 'someValue'}]}
>>> fetch_field_value(r, 'someKey')
'someValue'
pipewelder.core.parse_period(period)[source]

Return a timedelta object parsed from string period.

>>> parse_period("15 minutes")
datetime.timedelta(0, 900)
>>> parse_period("3 hours")
datetime.timedelta(0, 10800)
>>> parse_period("1 days")
datetime.timedelta(1)
pipewelder.core.parsed_object(conn, pipeline_id, object_id)[source]

Return an object dict as evaluated by Data Pipeline.

pipewelder.core.parsed_objects(conn, pipeline_id, object_ids)[source]

Return a list of object dicts as evaluated by Data Pipeline.

pipewelder.core.state_from_id(conn, pipeline_id)[source]

Return the @pipelineState string for object matching pipeline_id.

conn is a DataPipelineConnection object.