Source code for luna.common.PipelineBuilder
import yaml, json
import luna.transforms
[docs]def load(stream):
if isinstance(stream, str):
stream = open(stream, 'r')
pipeline_config = yaml.safe_load( stream )
pipeline = []
for stage, job_config in enumerate(pipeline_config['stages']):
print (f"STAGE {stage}: {job_config['job']}")
print (json.dumps(job_config, indent=4))
method_to_call = getattr(luna.transforms, job_config['job'])
pipeline.append ( (method_to_call, job_config) )
return pipeline