Source code for merlin.chips

from base64 import b64decode
from cytoolz import drop
from cytoolz import first
from cytoolz import get_in
from cytoolz import reduce
from cytoolz import unique
from merlin import functions as f
from merlin import specs
from operator import add
import logging
import numpy as np
import requests

logger = logging.getLogger(__name__)


[docs]def locations(x, y, cw, ch, rx, ry, sx, sy): """Computes locations for array elements that fall within the shape specified by chip_spec['data_shape'] using the x & y as the origin. locations() does not snap() x & y... this should be done prior to calling locations() if needed. Args: x: x coordinate y: y coordinate cw: chip width in pixels (e.g. 100 pixels) ch: chip height in pixels (e.g. 100 pixels) rx: x reflection (e.g. 1) ry: y reflection (e.g. -1) sx: x scale (e.g. 3000 meters) sy: y scale (e.g. 3000 meters) Returns: a two (three) dimensional numpy array of [x, y] coordinates """ pw = (sx * rx) / cw # e.g. 30 meters ph = (sy * ry) / ch # e.g. -30 meters # determine ends endx = x + cw * pw endy = y + ch * ph ################################################# # WARNING: The following line would transpose the # resulting matrix by 90 degrees. In order to # generate the proper row major matrix the order # y and x are created inside mgrid matters # # x, y = np.mgrid[x:endx:pw, y:endy:ph] ################################################# # build arrays of end - start / step shape # flatten into 1d, concatenate and reshape to fit chip _y, _x = np.mgrid[y:endy:ph, x:endx:pw] matrix = np.c_[_x.ravel(), _y.ravel()] return np.reshape(matrix, (cw, ch, 2))
[docs]def dates(chips): """Dates for a sequence of chips Args: chips: sequence of chips Returns: tuple: datestrings """ return tuple([c['acquired'] for c in chips])
[docs]def trim(chips, dates): """Eliminates chips that are not from the specified dates Args: chips: Sequence of chips dates: Sequence of dates that should be included in result Returns: tuple: filtered chips """ return tuple(filter(lambda c: c['acquired'] in dates, chips))
[docs]def chip_to_numpy(chip, chip_spec): """Removes base64 encoding of chip data and converts it to a numpy array Args: chip: A chip chip_spec: Corresponding chip_spec Returns: a decoded chip with data as a shaped numpy array """ shape = chip_spec['data_shape'] dtype = chip_spec['data_type'].lower() cdata = b64decode(chip['data']) chip['data'] = np.frombuffer(cdata, dtype).reshape(*shape) return chip
[docs]def to_numpy(chips, spec_index): """Converts the data for a sequence of chips to numpy arrays Args: chips (sequence): a sequence of chips spec_index (dict): chip_specs keyed by ubid Returns: sequence: chips with data as numpy arrays """ return map(lambda c: chip_to_numpy(c, spec_index[c['ubid']]), chips)
[docs]def identity(chip): """Determine the identity of a chip. Args: chip (dict): A chip Returns: tuple: Tuple of the chip identity field """ return tuple([chip['x'], chip['y'], chip['ubid'], chip['acquired']])
[docs]def deduplicate(chips): """Accepts a sequence of chips and returns a sequence of chips minus any duplicates. A chip is considered a duplicate if it shares an x, y, UBID and acquired date with another chip. Args: chips (sequence): Sequence of chips Returns: tuple: A nonduplicated tuple of chips """ return tuple(unique(chips, key=identity))
[docs]def mapped(x, y, acquired, specmap, chips_fn): """Maps chips_fn results to keys from specmap. Args: x (int): x coordinate y (int): y coordinate acquired (str): iso8601 date range specmap (dict): map of specs chips_fn (fn): function to return chips Returns: dict: {k: chips_fn()} """ return {k: chips_fn(x=x, y=y, acquired=acquired, ubids=specs.ubids(v)) for k, v in specmap.items()}
[docs]def rsort(chips, key=lambda c: c['acquired']): """Reverse sorts a sequence of chips by date. Args: chips: sequence of chips Returns: sorted sequence of chips """ return tuple(f.rsort(chips, key=key))