Source code for adr

import os
import re
import time
import uuid
import glob
import boto3
import zipfile
import logging
import subprocess
import fabfile
import botocore.exceptions


REGION_NAME = 'eu-central-1'


logger = logging.getLogger(__name__)


### CREATE ###########################################################

[docs]def create(region_name=REGION_NAME): '''Create runner Creates runner consisting of a unique runner ID, an Amazon S3 Bucket and an Amazon SQS Message Queue. The runner ID is a UUID4 string. The name of the bucket and queue is equal to the runner ID. Parameters ---------- region_name : str, optional Amazon region identifier Returns ------- str Runner ID See Also -------- launch queue process destroy create_queue create_bucket ''' s3 = boto3.resource('s3', region_name=region_name) sqs = boto3.resource('sqs', region_name=region_name) runner_id = str(uuid.uuid4()) create_queue(sqs, runner_id) create_bucket(s3, runner_id, region_name=region_name) return runner_id
[docs]def create_queue(sqs, runner_id): '''Create Amazon SQS Message Queue Parameters ---------- sqs : boto3 resource Boto3 SQS Resource object in specified region runner_id : str Runner ID and name of queue to be created ''' sqs.create_queue(QueueName=runner_id) logger.info('Created SQS queue "{}".'.format(runner_id))
[docs]def create_bucket(s3, runner_id, region_name=REGION_NAME, ACL='private'): '''Create Amazon S3 Bucket Parameters ---------- s3 : boto3 resource Boto3 S3 Resource object in specified region runner_id : str Runner ID and name of bucket to be created region_name : str, optional Amazon region idenifier that is used as constraint for inter-region data transfers ACL : str, optional Amazon access control list [default: private] ''' s3.create_bucket(ACL=ACL, Bucket=runner_id, CreateBucketConfiguration={'LocationConstraint': region_name}) logger.info('Created S3 bucket "{}".'.format(runner_id)) ### LAUNCH ###########################################################
[docs]def launch(runner_id, n, region_name=REGION_NAME, **kwargs): '''Launch Amazon workers for specific runner Launches Amazon instances for a given runner and registers the workers in the Amazon S3 Bucket. Each Amazon instance is tagged with the runner ID. Parameters ---------- runner_id : str Runner ID n : int Number of instances to launch region_name : str, optional Amazon region identifier kwargs : dict, optional Keyword options to :func:`launch_workers` Returns ------- list List with public IP addresses of workers See Also -------- prepare stop launch_workers register_workers ''' ec2 = boto3.resource('ec2', region_name=region_name) s3 = boto3.resource('s3', region_name=region_name) workers = launch_workers(ec2, runner_id, n=n, **kwargs) register_workers(s3, runner_id, workers) return workers
[docs]def launch_workers(ec2, runner_id, n=1, ami='ami-d09b6ebf', asg=['sg-13d17c7b'], akp='Amazon AeoLiS Test Key', ait='m3.medium'): '''Launch Amazon workers, tag them and wait for them to be online Parameters ---------- ec2 : boto3 resource Boto3 EC2 resource object in specified region runner_id : str Runner ID n : int, optional Number of instances to launch [default: 1] ait : str, optional Amazon Instance Type ami : str, optional Amazon Machine Image asg : list, optional List of strings with Amazon Security Groups akp : str, optional Name of Amazon Key Pair Returns ------- list List with public IP addresses of workers ''' instances = ec2.create_instances(ImageId=ami, MinCount=int(n), MaxCount=int(n), InstanceType=ait, KeyName=akp, SecurityGroupIds=asg) # wait until all instances are available hosts = [] for i, instance in enumerate(instances): instance.wait_until_running() name = '{}_{}'.format(runner_id[:7], i) instance.create_tags(Tags=[{'Key':'Name', 'Value':name}, {'Key':'Runner', 'Value':runner_id}]) instance.reload() hosts.append(instance.public_ip_address) logger.info('Launched instance "{}"'.format(instance.instance_id)) return list(set(hosts))
[docs]def start(runner_id, region_name=REGION_NAME, hosts=None): '''Start stopped workers for specific runner Parameters ---------- runner_id : str Runner ID region_name : str, optional Amazon region identifier hosts : list, optional List of specific hosts to start See Also -------- stop register_workers ''' s3 = boto3.resource('s3', region_name=region_name) for instance in iterate_workers(runner_id, region_name=region_name, hosts=hosts): if instance.state['Name'] == 'stopped': instance.start() for instance in iterate_workers(runner_id, region_name=region_name, hosts=hosts): instance.wait_until_running() instance.reload() logger.info('Started instance "{}"'.format(instance.instance_id)) # register worker register_workers(s3, runner_id, [instance.public_ip_address])
[docs]def stop(runner_id, region_name=REGION_NAME, hosts=None): '''Stop running workers for specific runner Parameters ---------- runner_id : str Runner ID region_name : str, optional Amazon region identifier hosts : list, optional List of specific hosts to stop See Also -------- start deregister_workers ''' s3 = boto3.resource('s3', region_name=region_name) for instance in iterate_workers(runner_id, region_name=region_name, hosts=hosts): if instance.state['Name'] == 'running': # deregister worker deregister_workers(s3, runner_id, [instance.public_ip_address]) instance.stop() logger.info('Stopped instance "{}"'.format(instance.instance_id))
[docs]def prepare(runner_id, region_name=REGION_NAME, hosts=None, user='ubuntu', password=None, key_filename=None, warn_only=False, timeout=600): '''Prepare workers for specific runner Install the Amazon Distributed Runner to all workers and start processing the queue. Installation procedure is defined in :mod:`fabfile`, which can be used as input to ``fab`` as well. Parameters ---------- runner_id : str Runner ID region_name : str, optional Amazon region idenitifier hosts : list, optional List of specific hosts to prepare user : str, optional SSH username password : str, optional SSH password key_filename : str, optional Path to SSH key file warn_only : bool, optional Only warn on error, but attempt to continue [default: True] timeout : int, optional Maxumum duration in seconds of installation execution [default: 600] See Also -------- :mod:`fabfile` ''' workers = get_workers(runner_id, region_name=region_name) if type(hosts) is list: workers = [w for w in workers if w in hosts] with fabfile.settings(user=user, password=password, key_filename=key_filename, hosts=workers, warn_only=warn_only, command_timeout=timeout, skip_bad_hosts=True): fabfile.execute(fabfile.install) fabfile.execute(fabfile.stop) fabfile.execute(fabfile.start, runner_id=runner_id)
[docs]def register_workers(s3, runner_id, workers, prefix='_workers/'): '''Register workers A worker is registered by creating an empty file object with a prefix ``prefix`` and name equal to the public IP address of the worker in the Amazon S3 Bucket. Parameters ---------- s3 : boto3 resource Boto3 S3 resource object in specified region runner_id : str Runner ID workers : str or list String or list of strings with public IP addresses of workers to be registered prefix : str, optional Prefix for generated keys See Also -------- deregister_workers ''' if not isiterable(workers): workers = [workers] for worker in workers: s3.Object(runner_id, ''.join((prefix, worker))).put(Body='')
[docs]def deregister_workers(s3, runner_id, workers, prefix='_workers/'): '''Deregister workers A worker is deregistered by deleting the empty file object with a prefix ``prefix`` and name equal to the public IP address of the worker in the Amazon S3 Bucket. Parameters ---------- s3 : boto3 resource Boto3 S3 resource object in specified region runner_id : str Runner ID workers : str or list String or list of strings with public IP addresses of workers to be deregistered prefix : str, optional Prefix for generated keys See Also -------- register_workers ''' if not isiterable(workers): workers = [workers] for worker in workers: s3.Object(runner_id, ''.join((prefix, worker))).delete() ### PROCESS ##########################################################
[docs]def process(runner_id, workingdir='.', region_name=REGION_NAME, stop_on_empty=False): '''Start processing loop for specific runner Each worker needs to start this processing procedure. It polls the queue, processes jobs and uploads the result. Parameters ---------- runner_id : str Runner ID workingdir : str Working dir at processing node region_name : str, optional Amazon region identifier stop_on_empty : bool, optional Flag to quit the processing loop if no messages are left [default: False] See Also -------- process_job ''' s3 = boto3.resource('s3', region_name=region_name) sqs = boto3.resource('sqs', region_name=region_name) while True: if not process_job(sqs, s3, runner_id, workingdir='.'): if stop_on_empty: logger.info('No jobs left. Stop') break
[docs]def process_job(sqs, s3, runner_id, workingdir='.'): '''Process specific job Parameters ---------- sqs : boto3 resource Boto3 SQS resource object in specified region s3 : boto3 resource Boto3 S3 resource object in specified region runner_id : str Runner ID workingdir : str, optional Working directory at processing node Returns ------- bool Returns True if a job has been processed and False otherwise See Also -------- get_job download_batch upload_files ''' # read message message = get_job(sqs, runner_id) if message is None: return False batch_id = message['Batch'] cmd = message['Command'] # download data batchpath = os.path.join(workingdir, batch_id) if not os.path.exists(batchpath): download_batch(s3, runner_id, batch_id, workingdir) cache_contents(batchpath) # run model shfile = 'run.sh' shpath = os.path.join(batchpath, shfile) with open(shpath, 'w') as fp: fp.write('#!/bin/bash\n\n') if message.has_key('PreProcessing'): fp.write('{}\n'.format(message['PreProcessing'])) fp.write('{}\n'.format(cmd)) if message.has_key('PostProcessing'): fp.write('{}\n'.format(message['PostProcessing'])) os.chmod(shpath, 0744) subprocess.call('./{}'.format(shfile), #dtach -n `mktemp -u ~/aeolis.XXXX` cwd=batchpath, shell=True) # store data if message.has_key('Store'): store_patterns = message['Store'].split('|') upload_files(s3, runner_id, batch_id, workingdir, include_patterns=store_patterns) restore_contents(batchpath) return True
[docs]def get_job(sqs, runner_id, delay=10, retry=30): '''Poll SQS Message Queue for job Parameters ---------- sqs : boto3 resource Boto3 SQS resource object in specified region runner_id : str Runner ID delay : int Delay in seconds between polls retry : int Maximum number of polls Returns ------- dict Message received from queue parsed with :func:`parse_message` See Also -------- parse_message ''' queue = sqs.get_queue_by_name(QueueName=runner_id) messages = [] for i in range(retry): logger.info('Polling queue "{}" ({}/{})...'.format(runner_id, i, retry)) messages = queue.receive_messages( MessageAttributeNames=['*'], MaxNumberOfMessages=1, ) messages = [m for m in messages if m.body == 'execution'] if len(messages) > 0: message = messages[0] message.delete() logger.info('Received message from queue "{}".'.format(runner_id)) return parse_message(message) time.sleep(delay)
[docs]def upload_files(s3, runner_id, batch_id, path, include_patterns=['\.nc$'], overwrite=False): '''Upload batch results to Amazon S3 Bucket Traverses directory tree and upload all files that match one or more regular expressions. Parameters ---------- s3 : boto3 resource Boto3 S3 resource object in specified region runner_id : str Runner ID batch_id : str Batch ID path : str Root directory for traversal include_patterns : list, optional List of regular expressions from which at least one should match for a file to be uploaded overwrite : bool, optional Flag to enable overwriting remote files [default: False] ''' for root, dirs, files in os.walk(os.path.join(path, batch_id)): for fname in files: if any([re.search(p, fname) for p in include_patterns]): key = '{}/{}'.format(batch_id, fname) if not key_exists(s3, runner_id, key) or overwrite: fpath = os.path.join(root, fname) s3.Object(runner_id, key).upload_file(fpath) logger.info('Uploaded "{}" to "{}/" in bucket "{}".'.format( os.path.relpath(fpath, path), batch_id, runner_id))
[docs]def download_batch(s3, runner_id, batch_id, path): '''Download batch input from Amazon S3 Bucket Download zipped batch input and unzip at specified location. Parameters ---------- s3 : boto3 resource Boto3 S3 resource object in specified region runner_id : str Runner ID batch_id : str Batch ID path : str Local download directory ''' zfile = '{}.zip'.format(batch_id) zpath = os.path.join(path, zfile) s3.Object(runner_id, zfile).download_file(zpath) logger.info('Downloaded "{}" from bucket "{}".'.format(zfile, runner_id)) if zipfile.is_zipfile(zpath): with zipfile.ZipFile(zpath, mode='r') as zh: zh.extractall(path) logger.info('Extracted "{}".'.format(zpath)) os.unlink(zpath) logger.info('Removed "{}".'.format(zpath))
[docs]def parse_message(message): '''Parses message from SQS Message Queue Parameters ---------- message : dict Multi-level message from SQS Message Queue Returns ------- dict Flattened message ''' parsed = {} for k, v in message.message_attributes.iteritems(): parsed[k] = v['StringValue'] return parsed ### QUEUE ############################################################
[docs]def queue(runner_id, files, region_name=REGION_NAME, command='aeolis {}', preprocessing='source ~/.envs/aeolis/bin/activate', postprocessing=None, store_patterns=['\.nc$']): '''Queue job Queues job to runner by zipping the root of all given input files, uploading the zipped input to the Amazon S3 Bucket and announcing the job to the Amazon SQS Message Queue. Parameters ---------- runner_id : str Runner ID files : list List of file names used as input region_name : str Amazon region identifier command : str Command pattern to be executed. A single placeholder {} can be used to determine the location where the input file is inserted. preprocessing : str Command to be executed preceding the ``command``. postprocessing : str Command to be executed following the ``command``. store_patterns : list List of regular expressions to idenitfy files that need to be stored to the Amazon S3 Bucket after execution. See Also -------- download queue_job upload_batch Notes ----- Be aware that the common root of all input files is zipped and uploaded. If the input files are located in very different locations, these files may have a very shallow common root that is potentially very large. ''' s3 = boto3.resource('s3', region_name=region_name) sqs = boto3.resource('sqs', region_name=region_name) files = [os.path.abspath(f) for f in files] root = find_root(files) batch_id = upload_batch(s3, runner_id, root) for fpath in files: fpath = os.path.relpath(fpath, root) queue_job(sqs, runner_id, batch_id, command=command.format(fpath), preprocessing=preprocessing, postprocessing=postprocessing, store_patterns=store_patterns)
[docs]def queue_job(sqs, runner_id, batch_id, command, store_patterns=None, preprocessing=None, postprocessing=None): '''Construct and send message to the Amazon SQS Message Queue Parameters ---------- sqs : boto3 resource Boto3 SQS resource object in specified region runner_id : str Runner ID batch_id : str Batch ID command : str Command pattern to be executed. A single placeholder {} can be used to determine the location where the input file is inserted. preprocessing : str Command to be executed preceding the ``command``. postprocessing : str Command to be executed following the ``command``. store_patterns : list List of regular expressions to idenitfy files that need to be stored to the Amazon S3 Bucket after execution. ''' queue = sqs.get_queue_by_name(QueueName=runner_id) attributes = { 'Runner' : { 'StringValue' : runner_id, 'DataType' : 'String', }, 'Batch' : { 'StringValue' : batch_id, 'DataType' : 'String', }, 'Command' : { 'StringValue' : command, 'DataType' : 'String', }, } if preprocessing: attributes['PreProcessing'] = { 'StringValue' : preprocessing, 'DataType' : 'String', } if postprocessing: attributes['PostProcessing'] = { 'StringValue' : postprocessing, 'DataType' : 'String', } if store_patterns: attributes['Store'] = { 'StringValue' : '|'.join(store_patterns), 'DataType' : 'String', } stats = queue.send_message( MessageBody='execution', MessageAttributes=attributes, ) logger.info('Queued job "{}" from batch "{}" for runner "{}".'.format( stats['MessageId'], batch_id, runner_id))
[docs]def upload_batch(s3, runner_id, path, exclude_patterns=['\.log$', '\.nc$', '\.pyc$']): '''Upload batch input to Amazon S3 Bucket Creates a unique batch ID and uploads the batch input under that id to the Amazon S3 Bucket. Parameters ---------- s3 : boto3 resource Boto3 S3 resource object in specified region Returns ------- str Batch ID ''' batch_id = str(uuid.uuid4()) logger.info('Creating batch "{}"...'.format(batch_id)) zfile = '{}.zip'.format(batch_id) zpath = os.path.abspath(os.path.join(path, '..', zfile)) with zipfile.ZipFile(zpath, mode='w', compression=zipfile.ZIP_DEFLATED) as zh: if os.path.isdir(path): for root, dirs, files in os.walk(path): for fname in files: abspath = os.path.join(root, fname) # check if any exclude pattern matches if any([re.search(p, abspath) for p in exclude_patterns]): continue relpath = os.path.relpath(abspath, path) zh.write(abspath, os.path.join(batch_id, relpath)) else: zh.write(path, os.path.join(batch_id, os.path.split(path)[1])) logger.info('Created "{}".'.format(zpath)) s3.Object(runner_id, zfile).upload_file(zpath) logger.info('Uploaded "{}" to bucket "{}".'.format(zfile, runner_id)) os.unlink(zpath) logger.info('Removed "{}".'.format(zpath)) return batch_id ### DOWNLOAD #########################################################
[docs]def download(runner_id, path, region_name=REGION_NAME, overwrite=False): '''Download batch results from Amazon S3 Bucket Parameters ---------- runner_id : str Runner ID path : str Local download location region_name : str, optional Amazon region identifier overwrite : bool, optional Overwrite existing files ''' s3 = boto3.resource('s3', region_name=region_name) for bucket in s3.buckets.all(): if bucket.name == runner_id: for obj in bucket.objects.all(): if not obj.key.endswith('.zip') and not obj.key.startswith('_'): fpath, fname = os.path.split(obj.key) downloadpath = os.path.join(path, fpath) downloadfile = os.path.join(downloadpath, fname) if not os.path.exists(downloadpath): os.makedirs(downloadpath) if not os.path.exists(downloadfile) or overwrite: obj.meta.client.download_file(bucket.name, obj.key, downloadfile) logger.info('Downloaded "{}" to "{}"'.format(obj.key, downloadpath)) ### DESTROY ##########################################################
[docs]def destroy(runner_id, region_name=REGION_NAME, hosts=None): '''Destroy runner Delete Amazon SQS Message Queue associated with specified runner, terminate its workers and clean its Amazon S3 Bucket (but keep the batch results). Parameters ---------- runner_id : str Runner ID region_name : str Amazon region identifier hosts : list, optional List of specific hosts to destroy Notes ----- No warnings, no undo! ''' # delete queue sqs = boto3.resource('sqs', region_name=region_name) queue = sqs.get_queue_by_name(QueueName=runner_id) queue.delete() logger.info('Deleted queue "{}"'.format(runner_id)) # terminate workers for instance in iterate_workers(runner_id, region_name=region_name, hosts=hosts): instance.terminate() logger.info('Terminated instance "{}"'.format(instance.instance_id)) # clean bucket s3 = boto3.resource('s3', region_name=region_name) for bucket in s3.buckets.all(): if bucket.name == runner_id: for obj in bucket.objects.all(): if obj.key.endswith('.zip'): obj.delete() logger.info('Deleted key "{}"'.format(obj.key)) elif obj.key.startswith('_'): obj.delete() logger.info('Deleted key "{}"'.format(obj.key)) ### LIST #############################################################
[docs]def get_runners(region_name=REGION_NAME): '''Get a list of valid runner ID's A valid runner ID is an ID with an active Amazon SQS Message Queue and Amazon S3 Bucket. Parameters ---------- region_name : str Amazon region identifier Returns ------- list List of valid runner ID's See Also -------- get_workers ''' sqs = boto3.resource('sqs', region_name=region_name) s3 = boto3.resource('s3', region_name=region_name) queues = [os.path.split(q.url)[1] for q in sqs.queues.all()] runners = [] for bucket in s3.buckets.all(): if bucket.name in queues: runners.append(bucket.name) return runners
[docs]def get_workers(runner_id, region_name=REGION_NAME, prefix='_workers/'): '''Get list of public IP addresses of workers for specific runner Parameters ---------- runner_id : str Runner ID region_name : str, optional Amazon region identifier prefix : str, optional Prefix used for registration of workers Returns ------- list List of public IP addresses See Also -------- get_runners register_workers deregister_workers ''' s3 = boto3.resource('s3', region_name=region_name) workers = [] for bucket in s3.buckets.all(): if bucket.name == runner_id: for obj in bucket.objects.filter(Prefix=prefix): if obj.key != prefix: workers.append(os.path.split(obj.key)[1]) return workers ### HELPER ###########################################################
[docs]def key_exists(s3, runner_id, key): '''Check if key exists in Amazon S3 Bucket Parameters ---------- s3 : boto3 resource Boto3 S3 resource object in specified region runner_id : str Runner ID key : str Key to be checked Returns ------- bool Flag indicating existence of key ''' try: s3.Object(runner_id, key).load() except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == "404": return False else: raise e return True
[docs]def find_root(files): '''Returns the common root of a collection of file paths Parameters ---------- files : list List of file paths Returns ------ str Common file root ''' parts = [f.split(os.path.sep) for f in files] ix = [len(set(x))<=1 for x in zip(*parts)].index(False) root = os.path.sep.join(parts[0][:ix]) logger.info('Determined root directory: "{}".'.format(root)) return root
[docs]def cache_contents(path): '''Cache current contents of directory Traverses a given directory and registers all contents in a hidden cache file. Parameters ---------- path : str Path for which the contents need to be cached Returns ------- str Cache filename See Also -------- restore_contents ''' cachepath = os.path.join(path, '.contents') with open(cachepath, 'w') as fp: for root, dirs, files in os.walk(path): for fname in files: fpath = os.path.abspath(os.path.join(root, fname)) fp.write('{}\n'.format(fpath)) return cachepath
[docs]def restore_contents(path): '''Restore contents of directory based on cache file Compares the current contents of a directory with the previously cached contents of that directory and removes any files and directories that have been added. Parameters ---------- path : str Path for which the contents need to be restored See Also -------- cache_contents ''' cachepath = os.path.join(path, '.contents') if os.path.exists(cachepath): with open(cachepath, 'r') as fp: cache = [l.strip() for l in fp.readlines()] for root, dirs, files in os.walk(path): for fname in files: fpath = os.path.abspath(os.path.join(root, fname)) if fpath not in cache: os.unlink(fpath)
[docs]def iterate_workers(runner_id, region_name=REGION_NAME, hosts=None): '''Iterator for Amazon EC2 instances associated with a given runner Parameters ---------- runner_id : str Runner ID region_name : str Amazon region identifier hosts : list List of specific hosts to iterate Returns ------- boto3 EC2 instance Boto3 EC2 instance object associated with given runner ''' ec2 = boto3.resource('ec2', region_name=region_name) for instance in ec2.instances.all(): if hosts is None or instance.public_ip_address in hosts: if runner_id in [t['Value'] for t in instance.tags if t['Key'] == 'Runner']: yield instance
[docs]def isiterable(lst): '''Checks if input is iterable''' try: iterator = iter(lst) except TypeError: return False return True