...
 
Commits (16)
#!/usr/bin/env python
# encoding: utf-8
"""
@author: <sylvain.herledan@oceandatalab.com>
@date: 2016-06-27
"""
from __future__ import print_function
import tempfile
import argparse
import datetime
import requests
import logging
import shutil
import magic
import gzip
import sys
import os
try:
import simplejson as json
except ImportError:
import json
from felyx_work.data import dynamic_sites
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
def retrieve_input_file(uri):
""" """
# Create a temporary file to store the result
file_descriptor, file_path = tempfile.mkstemp(dir=tempfile.gettempdir())
os.close(file_descriptor)
try:
r = requests.get(uri, verify=False, stream=True)
except requests.ConnectionError:
_, e, _ = sys.exc_info()
logger.error('Could not connect to the server hosting "{}"'.format(uri))
logger.exception(e)
return None
if not r.ok:
logger.error('Could not retrieve "{}": server replied with status {}'.format(uri, r.status_code))
return None
with open(file_path, 'wb') as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
logger.debug('Input file downloaded from "{}" to "{}"'.format(uri, file_path))
return file_path
if '__main__' == __name__:
parser = argparse.ArgumentParser(description="")
parser.add_argument( 'input_file'
, nargs=1
, type=str
, help='Input file, CSV or gzipped CSV')
parser.add_argument( 'submitted'
, nargs=1
, type=str
, help='Datetime of the update request submission in YYYY-mm-ddTHH:MM:SS format')
parser.add_argument( 'es_url'
, nargs=1
, type=str
, help='URL of the ElasticSearch server')
parser.add_argument( 'chunk_size'
, nargs=1
, type=int
, help='Size of the chunks of trajectory sent to ElasticSearch in one request')
args = parser.parse_args()
input_file = args.input_file[0]
es_url = args.es_url[0]
submitted_str = args.submitted[0]
chunk_size = args.chunk_size[0]
# Validate submission datetime format
try:
datetime.datetime.strptime(args.submitted[0], '%Y-%m-%dT%H:%M:%S')
except ValueError:
logger.error('Submission date does not match the requested format YYYY-mm-ddTHH:MM:SS')
sys.exit(1)
if input_file.startswith('http://') or input_file.startswith('https://'):
# Retrieve input file with requests
file_path = retrieve_input_file(input_file)
else:
file_path = input_file
# Check file availability
if not os.path.exists(input_file):
logger.error('File "{}" not found'.format(input_file))
sys.exit(1)
if input_file is None:
sys.exit(1)
# Find which opener should be used
mime = magic.from_file(file_path, mime=True)
if mime.startswith('text/'):
opener = open
elif mime in ['application/x-gzip', 'application/gzip']:
opener = gzip.open
else:
if file_path == input_file:
logger.error('Unsupported MIME type "{}" detected for input file "{}"'.format(mime, input_file))
else:
logger.error('Unsupported MIME type "{}" detected for file "{}" which was downloaded from "{}"'.format(mime, file_path, input_file))
sys.exit(1)
ok, failures = dynamic_sites.import_from_csv(file_path, submitted_str, opener, es_url=es_url, chunk_size=chunk_size)
if not ok:
logger.error('Indexation failed.')
if 0 < len(failures):
logger.error('Errors occurred with the following sites: {}'.format(', '.join(failures)))
sys.exit(1)
#!/usr/bin/env python
# encoding: utf-8
"""
felyx-cache
===========
Command line tool to recache the local file copy of the felyx-server data.
:copyright: Copyright 2014 Pelamis Scientific Software Ltd.
:license: Released under GPL v3 license, see :ref:`license`.
.. sectionauthor:: David Poulter <david.poulter@pelamis.co.uk>
.. codeauthor:: David Poulter <david.poulter@pelamis.co.uk>
"""
# Set up basic logger - including a root logger for the imported modules.
from felyx_work.utils.common_logging import get_logger, clean_exit
ROOT_LOGGER = get_logger('felyx_work')
LOGGER = get_logger('felyx-cache')
import logging
import argparse
from felyx_work.felyx_server_interface import recache_data
parser = argparse.ArgumentParser(
description=str(
'Regenerate the shared local cache of the felyx-server '
'database. '
),
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Add verbosity control.
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-v", "--verbose", action="store_true",
help="Activate debug level logging - for extra feedback."
)
group.add_argument(
"-q", "--quiet", action="store_true",
help="Disable information logging - for reduced feedback."
)
group.add_argument(
"-s", "--silent", action="store_true",
help="Log ONLY the most critical or fatal errors."
)
args = parser.parse_args()
# Set up verbosity option.
if args.verbose:
LOGGER.setLevel(logging.DEBUG)
elif args.quiet:
LOGGER.setLevel(logging.WARNING)
elif args.silent:
LOGGER.setLevel(logging.FATAL)
else:
LOGGER.setLevel(logging.INFO)
ROOT_LOGGER.setLevel(LOGGER.level)
# noinspection PyBroadException
try:
LOGGER.info(
"Starting static data re-cache."
)
recache_data()
LOGGER.info(
"Re-cached all non dynamic position data."
)
except Exception:
LOGGER.exception(
"Recache failed - cause information follows:\n Exiting."
)
clean_exit(1)
\ No newline at end of file
#!/usr/bin/env python
# encoding: utf-8
"""
@author: <sylvain.herledan@oceandatalab.com>
"""
from __future__ import print_function
from felyx_work.data.trajectory import get_shapes, get_trajectories
from felyx_work.export.csv_exporter import serialize_sites, serialize_dynamic, serialize_static
from felyx_work.felyx_server_interface.local import LocalFelyxData
import argparse
import logging
import datetime
import sys
import os
logger = logging.getLogger()
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.setLevel(logging.WARN)
if '__main__' == __name__:
parser = argparse.ArgumentParser()
parser.add_argument( '-f'
, '--format'
, help='Output format'
, type=str
, required=True
, default='csv')
parser.add_argument( '--collection'
, help='Code of the collection to export'
, required=True
, type=str)
parser.add_argument( '--sites'
, help='Comma-separated list of the sites to export'
, type=str)
parser.add_argument( '--begin'
, help='Beginning of the exported time window (in YYYY-mm-ddTHH:MM:SS format)'
, required=False
, type=str)
parser.add_argument( '--end'
, help='End of the exported time window (in YYYY-mm-ddTHH:MM:SS format)'
, required=False
, type=str)
parser.add_argument( '-o'
, '--output'
, help='Output path'
, type=str
, required=False)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument( '--verbose'
, help='Show more log messages during processing'
, action='store_true'
, default=False)
group.add_argument( '--debug'
, help='Show all log messages during processing'
, action='store_true'
, default=False)
args = parser.parse_args()
output_format = args.format
collection = args.collection
if args.verbose:
logger.setLevel(logging.INFO)
elif args.debug:
logger.setLevel(logging.DEBUG)
sites = None
if args.sites is not None:
sites = [x.strip() for x in args.sites.split(',')]
begin_dt = '1970-01-01T00:00:00'
if begin_dt is not None:
try:
_ = datetime.datetime.strptime(args.begin, '%Y-%m-%dT%H:%M:%S')
except ValueError:
_, e, _ = sys.exc_info()
logger.error('Value provided to --begin does not match the expected format')
logger.debug(str(e))
sys.exit(1)
begin_dt = args.begin
end_dt = '2100-01-01T00:00:00'
if args.end is not None:
try:
_ = datetime.datetime.strptime(args.end, '%Y-%m-%dT%H:%M:%S')
except ValueError:
_, e, _ = sys.exc_info()
logger.error('Value provided to --end does not match the expected format')
logger.debug(str(e))
sys.exit(1)
end_dt = args.end
output = None
if args.output is not None:
output = args.output
logger.debug('output: {}'.format(output))
logger.debug('output_format: {}'.format(output_format))
logger.debug('collection: {}'.format(collection))
logger.debug('sites: {}'.format(sites))
logger.debug('begin_dt: {}'.format(begin_dt))
logger.debug('end_dt: {}'.format(end_dt))
# Find out if the requested collection is static or dynamic
local_server = LocalFelyxData()
ref_objs = [ x for x in local_server._cache['site_collection'] if x['code'].lower() == collection.lower() ]
if 0 >= len(ref_objs):
logger.error('Could not find information about collection {} in the backend database'.format(ref_collection))
sys.exit(1)
ref_obj = ref_objs[0]
# List sites for the reference collection if the --sites option is left empty
if sites is None:
sites = [local_server.site(site_id=x)['code'] for x in ref_obj['site'] ]
# Extract and serialize data
if ref_obj['static']:
# Retrieve shapes from backend
vertices = get_shapes(sites)
ok = serialize_sites(vertices, output, serialize_static)
else:
# Retrieve trajectories from backend
vertices = get_trajectories(collection, sites, begin_dt, end_dt, tolerance=-1)
ok = serialize_sites(vertices, output, serialize_dynamic)
if not ok:
logger.error('Serialization step failed')
sys.exit(1)
#!/usr/bin/env python
# encoding: utf-8
"""
@author <sylvain.herledan@oceandatalab.com>
"""
import argparse
import requests
import datetime
import json
import sys
from felyx_work.utils.common_logging import clean_exit
from felyx_work.utils.configuration import get_es_server_url
es_metric_api = get_es_server_url()
if es_metric_api.endswith('/'):
es_metric_api = es_metric_api[:-1]
es_url = es_metric_api[:es_metric_api.rfind('/')]
# Miniprods
# -----------------------------------------------------------------------------
miniprods_url = '%s/miniprod' % es_url
miniprods_query = { 'query':
{ 'constant_score':
{ 'filter': {}
}
}
, 'fields': ['_id', 'time_coverage_start']
, 'sort': [ {'time_coverage_start': {'order': 'asc'}} ]
}
# Metrics
# -----------------------------------------------------------------------------
metrics_url = '%s/metric' % es_url
metrics_query = { "query":
{ "constant_score":
{ "filter":
{ "and": [
{ "has_parent":
{ "parent_type": "miniprod"
, "query":
{"ids":
{ "type": "miniprod"
, "values": []
}
}
}
}
, { "term": {"metric_name": "mean_sst"}}
]
}
}
}
, "fields": ["value", "_parent"]
}
#
#------------------------------------------------------------------------------
def create_miniprods_filter(dataset, collection, site, start, stop, metric_name, with_metrics=True):
""" """
filters = []
if dataset is not None:
filters.append({'term': {'dataset': dataset}})
if collection is not None:
filters.append({'term': {'collection': collection}})
if site is not None:
filters.append({'term': {'site': site}})
if None not in [start, stop]:
# Validate time format
try:
datetime.datetime.strptime(start[0], '%Y-%m-%dT%H:%M:%S')
datetime.datetime.strptime(stop[0], '%Y-%m-%dT%H:%M:%S')
except ValueError:
_, e, _ = sys.exc_info()
print >> sys.stderr, str(e)
clean_exit(1)
filters.append({'range': {'time_coverage_stop': {'gte': start[0]}}})
filters.append({'range': {'time_coverage_start': {'lte': stop[0]}}})
elif start is not None:
try:
datetime.datetime.strptime(start[0], '%Y-%m-%dT%H:%M:%S')
except ValueError:
_, e, _ = sys.exc_info()
print >> sys.stderr, str(e)
clean_exit(1)
filters.append({'range': {'time_coverage_stop': {'gte': start[0]}}})
elif stop is not None:
try:
datetime.datetime.strptime(stop[0], '%Y-%m-%dT%H:%M:%S')
except ValueError:
_, e, _ = sys.exc_info()
print >> sys.stderr, str(e)
clean_exit(1)
filters.append({'range': {'time_coverage_start': {'lte': stop[0]}}})
with_metrics_filter = { 'has_child': { 'type': 'metric', 'query': { 'term': { 'metric_name': metric_name } } } }
if with_metrics:
filters.append(with_metrics_filter)
else:
filters.append({'not': with_metrics_filter})
if 0 >= len(filters):
return None
if 1 < len(filters):
return {'and': filters}
return filters[0]
def list_miniprods_metrics(miniprods_ids, offset=0, size=1000, ids=None):
""" !!! Size must not exceed 1024 """
metrics_query['query']['constant_score']['filter']['and'][0]['has_parent']['query']['ids']['values'] = miniprods_ids
metrics_query['from'] = offset
metrics_query['size'] = size
r = requests.post( '%s/_search' % metrics_url
, data=json.dumps(metrics_query))
if r.ok:
result = json.loads(r.text)
else:
raise Exception('Could not execute query\n%s' % r.text)
hits = result['hits']['hits']
metrics_ids = map(lambda x: {'miniprod': x['fields']['_parent'], 'value':x['fields']['value'][0]}, hits)
ids.extend(metrics_ids)
# Handle pagination
total = result['hits']['total']
return offset+size, total
def list_miniprods(url, query, offset, size, ids, times):
""" Size must not exceed 1024 """
query['from'] = offset
query['size'] = size
r = requests.post( '%s/_search' % url
, data=json.dumps(query))
if r.ok:
result = json.loads(r.text)
else:
raise Exception('Could not execute query\n%s' % r.text)
hits = result['hits']['hits']
miniprods_ids = map(lambda x: x['_id'], hits)
miniprods_times = map(lambda x: x['fields']['time_coverage_start'][0], hits)
ids.extend(miniprods_ids)
times.extend(miniprods_times)
# Handle pagination
total = result['hits']['total']
return offset + size, total
#
#------------------------------------------------------------------------------
if '__main__' == __name__:
#collection = 'Initiative-PIM'
#site = 'TEKK000'
#dataset = 'arc-upa-l2p-atsr2-v2.1'
parser = argparse.ArgumentParser()
parser.add_argument('metric_name', nargs=1, type=str, help='metric name')
parser.add_argument( '--dataset', nargs=1, type=str, help='dataset ID')
parser.add_argument( '--collection', nargs=1, type=str
, help='collection name')
parser.add_argument( '--site', nargs=1, type=str, help='site code')
parser.add_argument( '--start', nargs=1, type=str, help='start date (YYYY-mm-ddTHH:MM:SS format')
parser.add_argument( '--stop', nargs=1, type=str, help='stop date (YYYY-mm-ddTHH:MM:SS format')
parser.add_argument( '--dry-run', action='store_true', default=False)
args = parser.parse_args()
# Update metric_name in metrics query
metrics_query["query"]["constant_score"]["filter"]["and"][1]["term"]["metric_name"] = args.metric_name[0]
miniprods_filters = create_miniprods_filter( args.dataset
, args.collection
, args.site
, args.start
, args.stop
, args.metric_name[0]
, False)
if miniprods_filters is None:
print >> sys.stderr, 'You must specify a dataset and/or a site and/or a collection'
clean_exit(1)
miniprods_query['query']['constant_score']['filter'] = miniprods_filters
miniprods = []
nometric_times = []
done = 0
total = 1
while done < total:
done, total = list_miniprods( miniprods_url, miniprods_query
, done, 500, miniprods, nometric_times)
empty_metrics = [ {x: 'null'} for x in nometric_times ]
no_metrics_miniprods = miniprods
#print empty_metrics
unique_nometric_times = len(list(set(nometric_times)))
miniprods_filters = create_miniprods_filter( args.dataset
, args.collection
, args.site
, args.start
, args.stop
, args.metric_name[0]
, True)
if miniprods_filters is None:
print >> sys.stderr, 'You must specify a dataset and/or a site and/or a collection'
clean_exit(1)
miniprods_query['query']['constant_score']['filter'] = miniprods_filters
miniprods = []
times = []
done = 0
total = 1
while done < total:
done, total = list_miniprods( miniprods_url, miniprods_query
, done, 500, miniprods, times)
total_times = len(times)
times_map = dict(list(set(zip(miniprods, times))))
#print times_map
#print miniprods
miniprods_count = total
unique_miniprods_count = len(list(set(miniprods)))
metrics = []
chunk_size = 500
i = 0
l = total
while i < l:
miniprods_chunk = miniprods[i:i+chunk_size]
done = 0
total = 1
while done < total:
done, total = list_miniprods_metrics( miniprods_chunk, done, 1000, metrics)
#print '%i metrics done for offset %i' % (len(metrics), i)
i = i + chunk_size
timed_metrics = [{times_map[x['miniprod']]: x['value']} for x in metrics]
timed_metrics.extend(empty_metrics)
sorted_metrics = sorted(timed_metrics, key=lambda x: x.keys()[0])
for i in xrange(0, len(sorted_metrics)):
miniprod = sorted_metrics[i]
t = miniprod.keys()[0]
print '%s\t%s' % (t, str(miniprod[t]))
print '-' * 80
print '%i miniprods with metric (%i duplicates)' % (miniprods_count, miniprods_count - unique_miniprods_count)
print '%i miniprods with no metric (%i duplicates)' % (len(empty_metrics), len(empty_metrics) - unique_nometric_times)
print '%i miniprods in total' % len(timed_metrics)
#print [x for x in miniprods if x in no_metrics_miniprods]
......@@ -13,26 +13,23 @@ A command line tool to index metric information in the archive to ElasticSearch
.. codeauthor:: David Poulter <david.poulter@pelamis.co.uk>
"""
from felyx_work.utils.common_logging import get_logger, clean_exit
ROOT_LOGGER = get_logger('felyx_work')
LOGGER = get_logger('felyx-metric-index')
from felyx_work.utils.configuration import get_felyx_config, \
get_updated_felyx_celery, get_es_server_url
import argparse
import os
import re
import sys
import pyes
import json
import logging
import socket
import os
import pyes
import re
import logging
import argparse
from felyx_work.utils.configuration import get_felyx_config, \
get_updated_felyx_celery, get_es_server_url
from felyx_work.miniprod.metric import EMPTY_INDEX, POPULATED_INDEX, \
metric_document_id
LOGGER = logging.getLogger(__name__)
# Get generic felyx configuration
DEFAULT_GRANULARITY = 1000
MINIPROD_ROOT_PATH, MINIPROD_PUBLIC_USER_NAME, DEFAULT_USER = get_felyx_config(
......@@ -335,6 +332,13 @@ verbosity_group.add_argument(
)
arguments = parser.parse_args()
main_logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
main_logger.addHandler(handler)
main_logger.setLevel(logging.INFO)
# Set up verbosity option.
if arguments.verbose:
LOGGER.setLevel(logging.DEBUG)
......@@ -344,13 +348,12 @@ elif arguments.silent:
LOGGER.setLevel(logging.FATAL)
else:
LOGGER.setLevel(logging.INFO)
ROOT_LOGGER.setLevel(LOGGER.level)
if arguments.metric_name and arguments.no_metrics:
LOGGER.critical(
"You cannot specify a metric-name and specify no-metrics."
)
clean_exit(1)
sys.exit(1)
if arguments.mode == 'remote':
celery = get_updated_felyx_celery()
......@@ -410,9 +413,9 @@ if arguments.mode == 'remote':
"avaialble. Exiting."
).format(clean_broker_info)
)
clean_exit(1)
sys.exit(1)
clean_exit()
sys.exit(0)
elif arguments.mode == 'local':
pass
......
This diff is collapsed.
This diff is collapsed.
......@@ -9,7 +9,6 @@ import datetime
import json
import sys
import os
from felyx_work.utils.common_logging import clean_exit
from felyx_work.utils.configuration import get_es_server_url
es_metric_api = get_es_server_url()
......@@ -114,7 +113,7 @@ def create_miniprods_filter(dataset, collection, site, start, stop):
except ValueError:
_, e, _ = sys.exc_info()
print >> sys.stderr, str(e)
clean_exit(1)
sys.exit(1)
filters.append({'range': {'time_coverage_start': {'gte': start[0], 'lte': stop[0]}}})
elif start is not None:
try:
......@@ -122,7 +121,7 @@ def create_miniprods_filter(dataset, collection, site, start, stop):
except ValueError:
_, e, _ = sys.exc_info()
print >> sys.stderr, str(e)
clean_exit(1)
sys.exit(1)
filters.append({'range': {'time_coverage_start': {'gte': start[0]}}})
elif stop is not None:
try:
......@@ -130,7 +129,7 @@ def create_miniprods_filter(dataset, collection, site, start, stop):
except ValueError:
_, e, _ = sys.exc_info()
print >> sys.stderr, str(e)
clean_exit(1)
sys.exit(1)
filters.append({'range': {'time_coverage_start': {'lte': stop[0]}}})
if 0 >= len(filters):
......@@ -225,7 +224,7 @@ if '__main__' == __name__:
, args.stop)
if miniprods_filters is None:
print >> sys.stderr, 'You must specify a dataset and/or a site and/or a collection'
clean_exit(1)
sys.exit(1)
miniprods_query['query']['constant_score']['filter'] = miniprods_filters
if not args.dry_run:
......@@ -239,7 +238,7 @@ if '__main__' == __name__:
confirm_msg += '\n\nAre you sure you want to continue?'
if not confirm(confirm_msg):
clean_exit(0)
sys.exit(0)
to_delete = []
metrics_count = 0
......
# encoding: utf-8
"""
felyx-cache
===========
Command line tool to recache the local file copy of the felyx-server data.
:copyright: Copyright 2014 Pelamis Scientific Software Ltd.
:license: Released under GPL v3 license, see :ref:`license`.
.. sectionauthor:: David Poulter <david.poulter@pelamis.co.uk>
.. codeauthor:: David Poulter <david.poulter@pelamis.co.uk>
"""
from __future__ import print_function
import sys
import logging
import argparse
import felyx_work.felyx_server_interface
logger = logging.getLogger()
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def parse_args(cli_args):
"""Parse options for the cache update script."""
parser = argparse.ArgumentParser(
description=str(
'Regenerate the shared local cache of the felyx-server '
'database. '
),
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Add verbosity control.
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-v", "--verbose", action="store_true",
help="Activate debug level logging - for extra feedback."
)
group.add_argument(
"-q", "--quiet", action="store_true",
help="Disable information logging - for reduced feedback."
)
group.add_argument(
"-s", "--silent", action="store_true",
help="Log ONLY the most critical or fatal errors."
)
args = parser.parse_args(cli_args)
return args
def felyx_cache(cli_args=None):
""""""
args = parse_args(cli_args)
# Set up verbosity option.
if args.verbose:
logger.setLevel(logging.DEBUG)
elif args.quiet:
logger.setLevel(logging.WARNING)
elif args.silent:
logger.setLevel(logging.FATAL)
logger.info("Starting static data re-cache.")
try:
felyx_work.felyx_server_interface.recache_data()
except Exception:
logger.exception("Recache failed - Exiting")
sys.exit(1)
logger.info("Re-cached all non dynamic position data.")
......@@ -13,7 +13,7 @@ logger.addHandler(handler)
logger.setLevel(logging.INFO)
def parse_args():
def parse_args(cli_args):
"""Parse options for the installation script."""
descr = ''' Felyx backend diagnostic utility.'''
......@@ -29,14 +29,14 @@ def parse_args():
help='Check the felyx-worker script')
parser.add_argument('--debug', action='store_true', default=False,
help='Print debug messages')
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
def felyx_diagnose():
def felyx_diagnose(cli_args=None):
""""""
args = parse_args()
args = parse_args(cli_args)
if args.debug:
logger.setLevel(logging.DEBUG)
......
......@@ -38,7 +38,6 @@ def parse_args(cli_args):
parser.add_argument('--verbose', action='store_true', default=False,
help='')
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
......
......@@ -38,7 +38,6 @@ def parse_args(cli_args):
parser.add_argument('--verbose', action='store_true', default=False,
help='')
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
......
......@@ -57,7 +57,7 @@ def felyx_import_csv(cli_args=None):
if args.debug:
main_logger.setLevel(logging.DEBUG)
logger.info('Ingesting sites information from {} into Felyx')
logger.info('Ingesting sites information from {} into Felyx'.format(file_path))
if (args.type == felyx_work.sites.INSITU) and (args.dataset is None):
logger.error('You must specify the dataset identifier with --dataset '
......
# -*- encoding: utf-8 -*-
"""
@author: <sylvain.herledan@oceandatalab.com>
@date: 2020-02-10
"""
import sys
import logging
import argparse
import felyx_work.sites.insitu
logger = logging.getLogger(__name__)
def parse_args(cli_args):
""""""
parser = argparse.ArgumentParser()
parser.add_argument('dataset_id', type=str,
help='Identifier of the in-situ dataset')
parser.add_argument('mappings_path', type=str,
help='Path of the mappings file to register')
parser.add_argument('--verbose', action='store_true', default=False)
parser.add_argument('--debug', action='store_true', default=False)
args = parser.parse_args(cli_args)
return args
def felyx_register_insitu_mappings(cli_args=None):
""""""
# Setup logging
main_logger = logging.getLogger()
main_logger.handlers = []
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
main_logger.addHandler(handler)
main_logger.setLevel(logging.WARN)
args = parse_args(cli_args)
if args.verbose:
main_logger.setLevel(logging.INFO)
if args.debug:
main_logger.setLevel(logging.DEBUG)
dataset_id = args.dataset_id
mappings_path = args.mappings_path
ok = felyx_work.sites.insitu.register_mappings(dataset_id, mappings_path)
if not ok:
sys.exit(1)
logger.info('Done')
sys.exit(0)
......@@ -85,7 +85,7 @@ def has_worker_capabilities():
return True
def parse_args():
def parse_args(cli_args):
"""Parse options for the installation script."""
descr = ''' Felyx backend installation utility.'''
......@@ -117,7 +117,7 @@ def parse_args():
parser.add_argument('--debug', action='store_true', default=False,
help='Print verbose log messages for debugging.'
'WARNING: passwords will be printed in plain text')
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
......@@ -363,9 +363,9 @@ def parse_celery_config(cfg_path, user_info):
return True, msg
def felyx_install():
def felyx_install(cli_args=None):
""""""
args = parse_args()
args = parse_args(cli_args)
if args.debug:
logger.setLevel(logging.DEBUG)
......
This diff is collapsed.
#!/usr/bin/env python
# encoding: utf-8
# -*- encoding: utf-8 -*-
"""
felyx-assemble-matchups
====================
A command line tool to assemble full match-ups from miniprods stored in the archive
A command line tool to assemble full match-ups from miniprods stored in the
archive
:copyright: Copyright 2015 Ifremer.
:license: Released under GPL v3 license, see :ref:`license`.
......@@ -13,19 +13,17 @@ A command line tool to assemble full match-ups from miniprods stored in the arch
.. codeauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
"""
import argparse
import os
import sys
import logging
import argparse
import ConfigParser
import os
from dateutil.parser import parse
import dateutil.parser
from felyx_work.utils.matchup import build
from felyx_work.utils.common_logging import get_logger, clean_exit
LOGGER = get_logger('felyx-assemble-matchup')
INSITU_LOGGER = get_logger('felyx_in_situ')
LOGGER = logging.getLogger(__name__)
def read_configuration(cfgfile):
......@@ -35,13 +33,13 @@ def read_configuration(cfgfile):
if not os.path.exists(cfgfile):
logger.error("Configuration file {} is not existing".format(
cfgfile))
exit(-1)
sys.exit(-1)
config.read(cfgfile)
return config
if '__main__' == __name__:
def parse_args(cli_args):
""""""
parser = argparse.ArgumentParser(
description=str(
'Produce a matchup dataset from the miniprods in the felyx archive'
......@@ -61,7 +59,8 @@ if '__main__' == __name__:
parser.add_argument(
'--collection-prefixes', nargs="+", default=None, help=str(
'The prefix used to uniquely identify the sites of this collection.'
'The prefix used to uniquely identify the sites of this '
'collection.'
' If no prefix is used, it means that the same site identifiers '
'were used for all the matched datasets.'
), dest='site_collection_prefixes'
......@@ -88,7 +87,7 @@ if '__main__' == __name__:
)
parser.add_argument(
'-c', '--configuration', default=None, type=str,
'-c', '--configuration', default=None, type=str,
help=str(
'A configuration file with additional settings such as the list of'
'variables to extract.'
......@@ -108,7 +107,8 @@ if '__main__' == __name__:
dyn_group = parser.add_argument_group(
'dynamic data',
'adding dynamic sites data, if existing, to the matchups')
dyn_group.add_argument('--add-dynamic-data', action='store_true', default=False)
dyn_group.add_argument('--add-dynamic-data', action='store_true',
default=False)
dyn_group.add_argument(
'--history', default=0, type=int, help=str(
'the time window of the history of dynamic site data that will be '
......@@ -123,10 +123,13 @@ if '__main__' == __name__:
)
parser.add_argument('--start', type=str, help='start date (YYYY-mm-ddTHH:MM:SS format)')
parser.add_argument('--stop', type=str, help='stop date (YYYY-mm-ddTHH:MM:SS format)')
parser.add_argument('--start', type=str,
help='start date (YYYY-mm-ddTHH:MM:SS format)')
parser.add_argument('--stop', type=str,
help='stop date (YYYY-mm-ddTHH:MM:SS format)')
parser.add_argument('--dry-run', action='store_true', default=False)
parser.add_argument('-l', '--log', type=str, help='path to the log file', default=None)
parser.add_argument('-l', '--log', type=str, help='path to the log file',
default=None)
# Add verbosity control.
verbosity_group = parser.add_mutually_exclusive_group()
......@@ -143,7 +146,22 @@ if '__main__' == __name__:
help="Log ONLY the most critical or fatal errors."
)
arguments = parser.parse_args()
arguments = parser.parse_args(cli_args)
return arguments
def felyx_assemble_matchups(cli_args=None):
""""""
# Setup logging
main_logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
main_logger.addHandler(handler)
main_logger.setLevel(logging.INFO)
arguments = parse_args(cli_args)
# Set up verbosity option.
if arguments.verbose:
LOGGER.setLevel(logging.DEBUG)
......@@ -153,25 +171,24 @@ if '__main__' == __name__:
LOGGER.setLevel(logging.FATAL)
else:
LOGGER.setLevel(logging.INFO)
INSITU_LOGGER.setLevel(LOGGER.level)
# set log file
if arguments.log is not None:
hdlr = logging.FileHandler(arguments.log)
LOGGER.addHandler(hdlr)
LOGGER.addHandler(hdlr)
# time selection
try:
start = None
if arguments.start:
start = parse(str(arguments.start))
start = dateutil.parser.parse(str(arguments.start))
end = None
if arguments.stop:
end = parse(arguments.stop)
end = dateutil.parser.parse(str(arguments.stop))
except ValueError:
_, e, _ = sys.exc_info()
print >> sys.stderr, str(e)
clean_exit(1)
logger.error(str(e))
sys.exit(1)
# reference dataset : the dataset to which all the other requested datasets
# are matched too = the first of the provided list
......@@ -180,11 +197,11 @@ if '__main__' == __name__:
[] if len(arguments.datasets_name) == 0
else arguments.datasets_name[1:]
)
# additional settings
settings = {'variables': {_: None for _ in arguments.datasets_name}}
if arguments.configuration is not None:
config = read_configuration(arguments.configuration)
config = read_configuration(arguments.configuration)
# list of variables to extract from each dataset
if config.has_section('variables'):
for dataset in arguments.datasets_name:
......@@ -194,7 +211,7 @@ if '__main__' == __name__:
).split(',')
]
settings['variables'][dataset] = varlist
build(reference,
start,
end,
......@@ -209,4 +226,3 @@ if '__main__' == __name__:
output_dir=arguments.output_dir,
variables=settings['variables']
)
......@@ -26,7 +26,7 @@ def parse_args(cli_args):
parser.add_argument('--verbose', action='store_true', default=False)
parser.add_argument('--debug', action='store_true', default=False)
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
......
......@@ -26,7 +26,7 @@ def parse_args(cli_args):
parser.add_argument('--verbose', action='store_true', default=False)
parser.add_argument('--debug', action='store_true', default=False)
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
......
......@@ -53,7 +53,7 @@ def serialize_sites(sites, output_path, serialization_method):
else:
output_fd = open(output_path, 'w')
writer = unicodecsv.writer(output_fd, encoding='utf-8', delimiter=';')
writer = unicodecsv.writer(output_fd, encoding='utf-8', delimiter=';')
for site_code, positions in sites.iteritems():
# Retrieve site information
site_obj = local_server.site(site_code=site_code)
......
......@@ -76,11 +76,11 @@ def dict_from_Data(miniprod):
_time_coverage_start = get_attr('time_coverage_start')
time_coverage_start = datetime.datetime.strptime(_time_coverage_start,
'%Y%m%dT%H%M%S')
'%Y%m%dT%H%M%SZ')
_time_coverage_stop = get_attr('time_coverage_end')
time_coverage_stop = datetime.datetime.strptime(_time_coverage_stop,
'%Y%m%dT%H%M%S')
'%Y%m%dT%H%M%SZ')
result = {'site': site_id,
'source': source,
......
......@@ -16,8 +16,6 @@ information.
import logging
LOGGER = logging.getLogger(__name__)
from felyx_work.utils.common_logging import clean_exit
import cPickle
import tempfile
import os
......@@ -62,7 +60,7 @@ def felyx_server_data(remote=False):
"recorded as: \n{}\nExiting."
).format(problem.message)
)
clean_exit(1)
raise
def recache_data():
......
......@@ -24,7 +24,6 @@ from shapely.wkt import loads as geometry
from felyx_work.miniprod.shape import get_felyx_shape
from felyx_work.felyx_server_interface.base import FelyxServerData,\
NoLocalCache, BadLocalCache
from felyx_work.utils.common_logging import clean_exit
from felyx_work.utils.configuration import get_felyx_config
......@@ -33,6 +32,22 @@ FELYX_CONFIG = None
COLLECTION_LEVELS = {'public': ['community', 'local'], 'private': ['custom']}
class DuplicateNameCollection(ValueError):
"""Error raised upon detection of several site collections sharing the same
name."""
pass
class SiteCollectionNotFound(ValueError):
"""Error raised when an unknown site collection has been requested."""
pass
class InvalidMatchingDatasetsCount(ValueError):
"""Error raised when the number of datasets found is not exactly 1."""
pass
def get_local_cache_filename():
"""Return the name of the default local cache file.
......@@ -139,7 +154,7 @@ class LocalFelyxData(FelyxServerData):
"felyx_server instance. Exiting."
).format(len(result), dataset_name)
)
clean_exit(1)
raise InvalidMatchingDatasetsCount()
elif dataset_id is not None:
# Return values for the unique dataset_id
......@@ -163,7 +178,7 @@ class LocalFelyxData(FelyxServerData):
dataset_name if dataset_name is not None else dataset_id
)
)
clean_exit(1)
raise
def metric(self, dataset_name):
"""
......@@ -271,18 +286,18 @@ class LocalFelyxData(FelyxServerData):
]
if len(collection) > 1:
LOGGER.critical(
"Multiple ite collections with the name {}. Exiting.".format(
"Multiple site collections with the name {}. Exiting.".format(
site_collection_name
)
)
clean_exit(1)
raise DuplicateNameCollection()
elif not len(collection):
LOGGER.critical(
"No site collection with the name {}. Exiting.".format(
site_collection_name
)
)
clean_exit(1)
raise SiteCollectionNotFound()
else:
return collection[0]['id']
......@@ -305,14 +320,14 @@ class LocalFelyxData(FelyxServerData):
site_collection_code
)
)
clean_exit(1)
raise DuplicateNameCollection()
elif not len(collection):
LOGGER.critical(
"No site collection with the name {}. Exiting.".format(
site_collection_code
)
)
clean_exit(1)
raise SiteCollectionNotFound()
else:
return collection[0]['name']
......
......@@ -13,11 +13,10 @@ instance. Functionally equivalent to any class that inherits from
.. sectionauthor:: David Poulter <david.poulter@pelamis.co.uk>
.. codeauthor:: David Poulter <david.poulter@pelamis.co.uk>
"""
import sys
import logging
from felyx_work.miniprod.shape import get_felyx_shape
from felyx_work.utils.common_logging import clean_exit
LOGGER = logging.getLogger(__name__)
......@@ -67,7 +66,7 @@ class RemoteFelyxData(FelyxServerData):
LOGGER.critical(
"default_felyx_api absent from felyx config file. Exiting."
)
clean_exit(1)
sys.exit(1)
else:
self._felyx_api = felyx_api
......@@ -131,7 +130,7 @@ class RemoteFelyxData(FelyxServerData):
"to that server was possible. Exiting."
).format(query)
)
clean_exit(1)
sys.exit(1)
if return_data.status_code != requests.codes.ok:
LOGGER.debug(
......@@ -151,7 +150,7 @@ class RemoteFelyxData(FelyxServerData):
"server. Exiting."
).format(return_data.status_code)
)
clean_exit(1)
sys.exit(1)
elif return_data.status_code >= 400:
LOGGER.critical(
str(
......@@ -163,7 +162,7 @@ class RemoteFelyxData(FelyxServerData):
" Exiting."
).format(return_data.status_code, query)
)
clean_exit(1)
sys.exit(1)
else:
LOGGER.debug(
str(
......@@ -181,7 +180,7 @@ class RemoteFelyxData(FelyxServerData):
"contact the felyx developers, with the details of "
"the following text. Now exiting."
)
clean_exit(1)
sys.exit(1)
# Log any errors as returned from tastypie
if 'error' in return_data:
......@@ -192,7 +191,7 @@ class RemoteFelyxData(FelyxServerData):
'\nExiting.'
).format(query, return_data['error'])
)
clean_exit(1)
sys.exit(1)
# Add results
if 'objects' in return_data:
......@@ -269,7 +268,7 @@ class RemoteFelyxData(FelyxServerData):
dataset_name if dataset_name is not None else dataset_id
)
)
clean_exit(1)
sys.exit(1)
def metric(self, dataset_name):
"""
......@@ -361,7 +360,7 @@ class RemoteFelyxData(FelyxServerData):
"Found {} and not 1 instances of site with code {}."
).format(len(result), site_code)
)
clean_exit(1)
sys.exit(1)
return result[0]
elif site_id is not None:
......@@ -376,7 +375,7 @@ class RemoteFelyxData(FelyxServerData):
"Found {} and not 1 instances of site with id {}."
).format(len(result), site_id)
)
clean_exit(1)
sys.exit(1)
return result
else:
return None
......
......@@ -339,7 +339,7 @@ def bulk_delete_buffer(index_path, ids, cmds_buffer):
return True
def bulk(cmds_buffer, timeout=10, query_string=None):
def bulk(cmds_buffer, timeout=180, query_string=None):
""""""
node_url = get_online_node()
endpoint_url = '{}/_bulk'.format(node_url)
......
......@@ -304,13 +304,11 @@ def _search(fields, es_prefix, node_url=None, sites=None, datasets=None,
else:
index_path = '{}metrics'.format(es_prefix)
endpoint_url = '{}/{}/_search'.format(node_url, index_path)
ok, response = felyx_work.index.es.impl.post_json(endpoint_url, query)
if ok is False:
return False, response
return True, response
query['size'] = 5000
ok, response = felyx_work.index.es.impl.search(index_path, query,
timeout=10,
query_string='scroll=5m')
return ok, response
def _from_metrics_response(es_response, metrics):
......@@ -327,7 +325,7 @@ def _from_metrics_response(es_response, metrics):
# uniquely identify one miniprod in most cases).
tmp = {}
_available_metrics = {}
for hit in es_response['hits']['hits']:
for hit in es_response:
dataset = hit['_source']['dataset']
site = hit['_source']['site']
start_str = hit['_source']['time_coverage']['gte']
......@@ -416,6 +414,5 @@ def search(es_prefix, sites, datasets=None, dt_start=None, dt_end=None,
if not ok:
return ok, response
json_response = response.json()
output = _from_metrics_response(json_response, metrics)
output = _from_metrics_response(response, metrics)
return True, output
......@@ -152,7 +152,7 @@ def _search(fields, es_prefix, node_url=None, sites=None, datasets=None,
if dt_start is not None:
if isinstance(dt_start, datetime.datetime):
dt_start = dt_start.strftime(time_fmt)
dt_start = dt_start.strftime(TIME_FMT)
dt_start = dt_start.replace('T', ' ').replace('Z', '')
if dt_end is not None:
......@@ -198,13 +198,11 @@ def _search(fields, es_prefix, node_url=None, sites=None, datasets=None,
index_path = '{}miniprods'.format(es_prefix)
endpoint_url = '{}/{}/_search'.format(node_url, index_path)
ok, response = felyx_work.index.es.impl.post_json(endpoint_url, query)
if ok is False:
return False, response
return True, response
query['size'] = 5000
ok, response = felyx_work.index.es.impl.search(index_path, query,
timeout=10,
query_string='scroll=5m')
return ok, response
def search(es_prefix, sites, datasets=None