...
 
# -*- coding: utf-8 -*-
"""
@author <sylvain.herledan@oceandatalab.com>
@date 2019-01-10
"""
import sys
import json
import logging
import argparse
import datetime
import felyx_work.index.es
import felyx_work.utils.configuration
logger = logging.getLogger(__name__)
def parse_args(cli_args):
""""""
parser = argparse.ArgumentParser()
parser.add_argument('datasets', type=str,
help='Colon-separated list of dataset identifiers')
parser.add_argument('sites', type=str,
help='Colon-separated list of site identifiers')
parser.add_argument('metrics', type=str,
help='Colon-separated list of metric identifiers to '
'extract from the index')
parser.add_argument('start', type=str,
help='Beginning of the time coverage')
parser.add_argument('stop', type=str,
help='End of the time coverage')
parser.add_argument('--wkt', type=str, default=None,
help='Restrict search to the area defined by the '
'provided WKT string')
parser.add_argument('--debug', action='store_true', default=False,
help='')
parser.add_argument('--verbose', action='store_true', default=False,
help='')
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
def felyx_get_metrics(cli_args=None):
""""""
# Setup logging
main_logger = logging.getLogger()
main_logger.handlers = []
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
main_logger.addHandler(handler)
main_logger.setLevel(logging.WARNING)
args = parse_args(cli_args)
sites = args.sites.split(':')
datasets = args.datasets.split(':')
metrics = args.metrics.split(':')
start = args.start
stop = args.stop
wkt = None
if args.wkt is not None:
wkt = args.wkt
if args.verbose:
main_logger.setLevel(logging.INFO)
if args.debug:
main_logger.setLevel(logging.DEBUG)
# Check dates format
time_fmt = '%Y-%m-%d %H:%M:%S'
ok = True
try:
_ = datetime.datetime.strptime(start, time_fmt)
except ValueError:
logger.error('Start not in the expected format: "{}"'.format(time_fmt))
ok = False
try:
_ = datetime.datetime.strptime(stop, time_fmt)
except ValueError:
logger.error('Stop not in the expected format: "{}"'.format(time_fmt))
ok = False
if not ok:
sys.exit(1)
# TODO: let users define constraints from the command line
constraints = None
cfg = felyx_work.utils.configuration.get_felyx_config()
es_prefix = cfg['es_prefix']
index = felyx_work.index.es.ESFelyxIndex(es_prefix)
values = index.search_metrics(sites, datasets, start, stop, wkt,
constraints, metrics)
print(json.dumps(values, indent=2))
# -*- coding: utf-8 -*-
"""
@author <sylvain.herledan@oceandatalab.com>
@date 2019-01-10
"""
import sys
import json
import logging
import argparse
import datetime
import felyx_work.index.es
import felyx_work.utils.configuration
logger = logging.getLogger(__name__)
def parse_args(cli_args):
""""""
parser = argparse.ArgumentParser()
parser.add_argument('dataset', type=str,
help='Dataset identifier')
parser.add_argument('sites', type=str,
help='Colon-separated list of site identifiers')
parser.add_argument('fields', type=str,
help='Colon-separated list of field identifiers to '
'extract from the index')
parser.add_argument('start', type=str,
help='Beginning of the time coverage')
parser.add_argument('stop', type=str,
help='End of the time coverage')
parser.add_argument('--wkt', type=str, default=None,
help='Restrict search to the area defined by the '
'provided WKT string')
parser.add_argument('--debug', action='store_true', default=False,
help='')
parser.add_argument('--verbose', action='store_true', default=False,
help='')
args = parser.parse_args()
args = parser.parse_args(cli_args)
return args
def felyx_get_raws(cli_args=None):
""""""
# Setup logging
main_logger = logging.getLogger()
main_logger.handlers = []
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
main_logger.addHandler(handler)
main_logger.setLevel(logging.WARNING)
args = parse_args(cli_args)
dataset = args.dataset
sites = args.sites.split(':')
fields = args.fields.split(':')
start = args.start
stop = args.stop
wkt = None
if args.wkt is not None:
wkt = args.wkt
if args.verbose:
main_logger.setLevel(logging.INFO)
if args.debug:
main_logger.setLevel(logging.DEBUG)
# Check dates format
time_fmt = '%Y-%m-%d %H:%M:%S'
ok = True
try:
_ = datetime.datetime.strptime(start, time_fmt)
except ValueError:
logger.error('Start not in the expected format: "{}"'.format(time_fmt))
ok = False
try:
_ = datetime.datetime.strptime(stop, time_fmt)
except ValueError:
logger.error('Stop not in the expected format: "{}"'.format(time_fmt))
ok = False
if not ok:
sys.exit(1)
# TODO: let users define constraints from the command line
constraints = None
cfg = felyx_work.utils.configuration.get_felyx_config()
es_prefix = cfg['es_prefix']
index = felyx_work.index.es.ESFelyxIndex(es_prefix)
values = index.search_raws(sites, dataset, start, stop, wkt,
constraints, fields)
print(json.dumps(values, indent=2))
......@@ -91,3 +91,9 @@ class FelyxIndex:
metrics):
""""""
pass
@abc.abstractmethod
def search_raws(self, sites, dataset, start, stop, wkt, constraints,
fields):
""""""
pass
......@@ -105,3 +105,13 @@ class ESFelyxIndex(felyx_work.index.FelyxIndex):
if not ok:
return None
return result
def search_raws(self, sites, dataset, start, stop, wkt, constraints,
fields):
""""""
ok, result = felyx_work.index.es.raw.search(self.es_prefix, sites,
dataset, start, stop,
wkt, constraints, fields)
if not ok:
return None
return result
......@@ -298,7 +298,11 @@ def _search(fields, es_prefix, node_url=None, sites=None, datasets=None,
if node_url is None:
node_url = felyx_work.index.es.impl.get_online_node()
index_path = '{}metrics'.format(es_prefix)
if 'index_name' in kwargs.keys():
# for raw/insitu data
index_path = '{}{}'.format(es_prefix, kwargs['index_name'])
else:
index_path = '{}metrics'.format(es_prefix)
endpoint_url = '{}/{}/_search'.format(node_url, index_path)
ok, response = felyx_work.index.es.impl.post_json(endpoint_url, query)
......
......@@ -12,6 +12,7 @@ import calendar
import shapely.geometry
import felyx_work.index.es.impl
import felyx_work.index.es.metric
try:
import simplejson as json
......@@ -86,3 +87,33 @@ def get_trajectories(es_prefix, collection_code, sites_codes, start, stop,
line = shapely.geometry.LineString(trajectories[site_code])
trajectories[site_code] = list(line.simplify(tolerance).coords)
return True, trajectories
def search(es_prefix, sites, dataset, dt_start=None, dt_end=None,
wkt_coords=None, constraints_list=None, measurements=None,
node_url=None):
""""""
fields = ['site', 'dataset', 'time_coverage', 'data.name', 'data.no_value',
'data.num_value', 'data.bool_value', 'data.txt_value']
index_name = 'raw_{}'.format(dataset)
ok, response = felyx_work.index.es.metric._search(fields=fields,
es_prefix=es_prefix,
node_url=node_url,
sites=sites,
datasets=[dataset],
dt_start=dt_start,
dt_end=dt_end,
wkt_coords=wkt_coords,
constraints_list=constraints_list,
metrics=measurements,
index_name=index_name)
if not ok:
return ok, response
json_response = response.json()
output = felyx_work.index.es.metric._from_metrics_response(json_response,
measurements)
return True, output
......@@ -99,6 +99,8 @@ setup(
'felyx-register-metrics = felyx_work.cli.register_metrics:felyx_register_metrics',
'felyx-miniprod = felyx_work.cli.miniprod:felyx_miniprod',
'felyx-metric = felyx_work.cli.metric:felyx_metric',
'felyx-get-metrics = felyx_work.cli.get_metrics:felyx_get_metrics',
'felyx-get-raws = felyx_work.cli.get_raws:felyx_get_raws',
],
},
)