Source code for astroplan.scheduling

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Tools for scheduling observations.
"""

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import copy
from abc import ABCMeta, abstractmethod

import numpy as np

from astropy import units as u
from astropy.time import Time
from astropy.table import Table

from .utils import time_grid_from_range, stride_array
from .constraints import AltitudeConstraint

__all__ = ['ObservingBlock', 'TransitionBlock', 'Schedule', 'Slot', 'Scheduler',
           'SequentialScheduler', 'PriorityScheduler', 'Transitioner', 'Scorer']


[docs]class ObservingBlock(object): """ An observation to be scheduled, consisting of a target and associated constraints on observations. """ @u.quantity_input(duration=u.second) def __init__(self, target, duration, priority, configuration={}, constraints=None): """ Parameters ---------- target : `~astroplan.FixedTarget` Target to observe duration : `~astropy.units.Quantity` exposure time priority : integer or float priority of this object in the target list. 1 is highest priority, no maximum configuration : dict Configuration metadata constraints : list of `~astroplan.constraints.Constraint` objects The constraints to apply to this particular observing block. Note that constraints applicable to the entire list should go into the scheduler. """ self.target = target self.duration = duration self.priority = priority self.configuration = configuration self.constraints = constraints self.start_time = self.end_time = None self.observer = None def __repr__(self): orig_repr = object.__repr__(self) if self.start_time is None or self.end_time is None: return orig_repr.replace('object at', '({0}, unscheduled) at' .format(self.target.name)) else: s = '({0}, {1} to {2}) at'.format(self.target.name, self.start_time, self.end_time) return orig_repr.replace('object at', s) @property def constraints_scores(self): if not (self.start_time and self.duration): return None # TODO: setup a way of caching or defining it as an attribute during scheduling elif self.observer: return {constraint: constraint(self.observer, [self.target], times=[self.start_time, self.start_time + self.duration]) for constraint in self.constraints} @classmethod
[docs] def from_exposures(cls, target, priority, time_per_exposure, number_exposures, readout_time=0 * u.second, configuration={}, constraints=None): duration = number_exposures * (time_per_exposure + readout_time) ob = cls(target, duration, priority, configuration, constraints) ob.time_per_exposure = time_per_exposure ob.number_exposures = number_exposures ob.readout_time = readout_time return ob
[docs]class Scorer(object): """ Returns scores and score arrays from the evaluation of constraints on observing blocks """ def __init__(self, blocks, observer, schedule, global_constraints=[]): """ Parameters ---------- blocks : list of `~astroplan.scheduling.ObservingBlock` objects list of blocks that need to be scored observer : `~astroplan.Observer` the observer schedule : `~astroplan.scheduling.Schedule` The schedule inside which the blocks should fit global_constraints : list of `~astroplan.Constraint` objects any ``Constraint`` that applies to all the blocks """ self.blocks = blocks self.observer = observer self.schedule = schedule self.global_constraints = global_constraints
[docs] def create_score_array(self, time_resolution=1*u.minute): """ this makes a score array over the entire schedule for all of the blocks and each `~astroplan.Constraint` in the .constraints of each block and in self.global_constraints. Parameters ---------- time_resolution : `~astropy.units.Quantity` the time between each scored time Returns ------- score_array : `~numpy.ndarray` array with dimensions (# of blocks, schedule length/ ``time_resolution`` """ start = self.schedule.start_time end = self.schedule.end_time times = time_grid_from_range((start, end), time_resolution) score_array = np.ones((len(self.blocks), len(times))) for i, block in enumerate(self.blocks): # TODO: change the default constraints from None to [] if block.constraints: for constraint in block.constraints: applied_score = constraint(self.observer, [block.target], times=times)[0] score_array[i] *= applied_score targets = [block.target for block in self.blocks] for constraint in self.global_constraints: score_array *= constraint(self.observer, targets, times) return score_array
@classmethod
[docs] def from_start_end(cls, blocks, observer, start_time, end_time, global_constraints=[]): """ for if you don't have a schedule/ aren't inside a scheduler """ dummy_schedule = Schedule(start_time, end_time) sc = cls(blocks, observer, dummy_schedule, global_constraints) return sc
[docs]class TransitionBlock(object): """ Parameterizes the "dead time", e.g. between observations, while the telescope is slewing, instrument is reconfiguring, etc. """ def __init__(self, components, start_time=None): """ Parameters ---------- components : dict A dictionary mapping the reason for an observation's dead time to `~astropy.units.Quantity` objects with time units start_time : `~astropy.units.Quantity` Start time of observation """ self._components = None self.duration = None self.start_time = start_time self.components = components def __repr__(self): orig_repr = object.__repr__(self) comp_info = ', '.join(['{0}: {1}'.format(c, t) for c, t in self.components.items()]) if self.start_time is None or self.end_time is None: return orig_repr.replace('object at', ' ({0}, unscheduled) at'.format(comp_info)) else: s = '({0}, {1} to {2}) at'.format(comp_info, self.start_time, self.end_time) return orig_repr.replace('object at', s) @property def end_time(self): return self.start_time + self.duration @property def components(self): return self._components @components.setter def components(self, val): duration = 0*u.second for t in val.values(): duration += t self._components = val self.duration = duration @classmethod @u.quantity_input(duration=u.second)
[docs] def from_duration(cls, duration): # for testing how to put transitions between observations during # scheduling without considering the complexities of duration tb = TransitionBlock({'duration': duration}) return tb
[docs]class Schedule(object): """ An object that represents a schedule, consisting of a list of `~astroplan.scheduling.Slot` objects. """ # as currently written, there should be no consecutive unoccupied slots # this should change to allow for more flexibility (e.g. dark slots, grey slots) def __init__(self, start_time, end_time, constraints=None): """ Parameters ----------- start_time : `~astropy.time.Time` The starting time of the schedule; the start of your observing window end_time : `~astropy.time.Time` The ending time of the schedule; the end of your observing window constraints : sequence of `~astroplan.constraints.Constraint` s these are constraints that apply to the entire schedule """ self.start_time = start_time self.end_time = end_time self.slots = [Slot(start_time, end_time)] self.observer = None def __repr__(self): return ('Schedule containing ' + str(len(self.observing_blocks)) + ' observing blocks between ' + str(self.slots[0].start.iso) + ' and ' + str(self.slots[-1].end.iso)) @property def observing_blocks(self): return [slot.block for slot in self.slots if isinstance(slot.block, ObservingBlock)] @property def scheduled_blocks(self): return [slot.block for slot in self.slots if slot.block] @property def open_slots(self): return [slot for slot in self.slots if not slot.occupied]
[docs] def to_table(self, show_transitions=True, show_unused=False): # TODO: allow different coordinate types target_names = [] start_times = [] end_times = [] durations = [] ra = [] dec = [] config = [] for slot in self.slots: if hasattr(slot.block, 'target'): start_times.append(slot.start.iso) end_times.append(slot.end.iso) durations.append(slot.duration.to(u.minute).value) target_names.append(slot.block.target.name) ra.append(slot.block.target.ra) dec.append(slot.block.target.dec) config.append(slot.block.configuration) elif show_transitions and slot.block: start_times.append(slot.start.iso) end_times.append(slot.end.iso) durations.append(slot.duration.to(u.minute).value) target_names.append('TransitionBlock') ra.append('') dec.append('') changes = list(slot.block.components.keys()) if 'slew_time' in changes: changes.remove('slew_time') config.append(changes) elif slot.block is None and show_unused: start_times.append(slot.start.iso) end_times.append(slot.end.iso) durations.append(slot.duration.to(u.minute).value) target_names.append('Unused Time') ra.append('') dec.append('') config.append('') return Table([target_names, start_times, end_times, durations, ra, dec, config], names=('target', 'start time (UTC)', 'end time (UTC)', 'duration (minutes)', 'ra', 'dec', 'configuration'))
[docs] def new_slots(self, slot_index, start_time, end_time): # this is intended to be used such that there aren't consecutive unoccupied slots new_slots = self.slots[slot_index].split_slot(start_time, end_time) return new_slots
[docs] def insert_slot(self, start_time, block): # due to float representation, this will change block start time # and duration by up to 1 second in order to fit in a slot for j, slot in enumerate(self.slots): if ((slot.start < start_time or abs(slot.start-start_time) < 1*u.second) and (slot.end > start_time + 1*u.second)): slot_index = j if (block.duration - self.slots[slot_index].duration) > 1*u.second: raise ValueError('longer block than slot') elif self.slots[slot_index].end - block.duration < start_time: start_time = self.slots[slot_index].end - block.duration if abs((self.slots[slot_index].duration - block.duration) < 1 * u.second): block.duration = self.slots[slot_index].duration start_time = self.slots[slot_index].start end_time = self.slots[slot_index].end elif abs(self.slots[slot_index].start - start_time) < 1*u.second: start_time = self.slots[slot_index].start end_time = start_time + block.duration elif abs(self.slots[slot_index].end - start_time - block.duration) < 1*u.second: end_time = self.slots[slot_index].end else: end_time = start_time + block.duration if isinstance(block, ObservingBlock): # TODO: make it shift observing/transition blocks to fill small amounts of open space block.end_time = start_time+block.duration earlier_slots = self.slots[:slot_index] later_slots = self.slots[slot_index+1:] block.start_time = start_time new_slots = self.new_slots(slot_index, start_time, end_time) for new_slot in new_slots: if new_slot.middle: new_slot.occupied = True new_slot.block = block self.slots = earlier_slots + new_slots + later_slots return earlier_slots + new_slots + later_slots
[docs] def change_slot_block(self, slot_index, new_block=None): # currently only written to work for TransitionBlocks in PriorityScheduler # made with the assumption that the slot afterwards is open and that the # start time will remain the same new_end = self.slots[slot_index].start + new_block.duration self.slots[slot_index].end = new_end self.slots[slot_index].block = new_block if self.slots[slot_index + 1].block: raise IndexError('slot afterwards is full') self.slots[slot_index + 1].start = new_end
[docs]class Slot(object): """ A time slot consisting of a start and end time """ def __init__(self, start_time, end_time): """ Parameters ----------- start_time : `~astropy.time.Time` The starting time of the slot end_time : `~astropy.time.Time` The ending time of the slot """ self.start = start_time self.end = end_time self.occupied = False self.middle = False self.block = None @property def duration(self): return self.end - self.start
[docs] def split_slot(self, early_time, later_time): # check if the new slot would overwrite occupied/other slots if self.occupied: raise ValueError('slot is already occupied') new_slot = Slot(early_time, later_time) new_slot.middle = True early_slot = Slot(self.start, early_time) late_slot = Slot(later_time, self.end) if early_time > self.start and later_time < self.end: return [early_slot, new_slot, late_slot] elif early_time > self.start: return [early_slot, new_slot] elif later_time < self.end: return [new_slot, late_slot] else: return [new_slot]
[docs]class Scheduler(object): """ Schedule a set of `~astroplan.scheduling.ObservingBlock` objects """ __metaclass__ = ABCMeta @u.quantity_input(gap_time=u.second, time_resolution=u.second) def __init__(self, constraints, observer, transitioner=None, gap_time=5*u.min, time_resolution=20*u.second): """ Parameters ---------- constraints : sequence of `~astroplan.constraints.Constraint` The constraints to apply to *every* observing block. Note that constraints for specific blocks can go on each block individually. observer : `~astroplan.Observer` The observer/site to do the scheduling for. transitioner : `~astroplan.scheduling.Transitioner` (required) The object to use for computing transition times between blocks. Leaving it as ``None`` will cause an error. gap_time : `~astropy.units.Quantity` with time units The maximum length of time a transition between ObservingBlocks could take. time_resolution : `~astropy.units.Quantity` with time units The smallest factor of time used in scheduling, all Blocks scheduled will have a duration that is a multiple of it. """ self.constraints = constraints self.observer = observer self.transitioner = transitioner if not isinstance(self.transitioner, Transitioner): raise ValueError("A Transitioner is required") self.gap_time = gap_time self.time_resolution = time_resolution
[docs] def __call__(self, blocks, schedule): """ Schedule a set of `~astroplan.scheduling.ObservingBlock` objects. Parameters ---------- blocks : list of `~astroplan.scheduling.ObservingBlock` objects The observing blocks to schedule. Note that the input `~astroplan.scheduling.ObservingBlock` objects will *not* be modified - new ones will be created and returned. schedule : `~astroplan.scheduling.Schedule` object A schedule that the blocks will be scheduled in. At this time the ``schedule`` must be empty, only defined by a start and end time. Returns ------- schedule : `~astroplan.scheduling.Schedule` A schedule objects which consists of `~astroplan.scheduling.Slot` objects with and without populated ``block`` objects containing either `~astroplan.scheduling.TransitionBlock` or `~astroplan.scheduling.ObservingBlock` objects with populated ``start_time`` and ``end_time`` or ``duration`` attributes """ self.schedule = schedule self.schedule.observer = self.observer # these are *shallow* copies copied_blocks = [copy.copy(block) for block in blocks] schedule = self._make_schedule(copied_blocks) return schedule
@abstractmethod def _make_schedule(self, blocks): """ Does the actual business of scheduling. The ``blocks`` passed in should have their ``start_time` and `end_time`` modified to reflect the schedule. Any necessary `~astroplan.scheduling.TransitionBlock` should also be added. Then the full set of blocks should be returned as a list of blocks, along with a boolean indicating whether or not they have been put in order already. Parameters ---------- blocks : list of `~astroplan.scheduling.ObservingBlock` objects Can be modified as it is already copied by ``__call__`` Returns ------- schedule : `~astroplan.scheduling.Schedule` A schedule objects which consists of `~astroplan.scheduling.Slot` objects with and without populated ``block`` objects containing either `~astroplan.scheduling.TransitionBlock` or `~astroplan.scheduling.ObservingBlock` objects with populated ``start_time`` and ``end_time`` or ``duration`` attributes. """ raise NotImplementedError return schedule @classmethod @u.quantity_input(duration=u.second)
[docs] def from_timespan(cls, center_time, duration, **kwargs): """ Create a new instance of this class given a center time and duration. Parameters ---------- center_time : `~astropy.time.Time` Mid-point of time-span to schedule. duration : `~astropy.units.Quantity` or `~astropy.time.TimeDelta` Duration of time-span to schedule """ start_time = center_time - duration / 2. end_time = center_time + duration / 2. return cls(start_time, end_time, **kwargs)
[docs]class SequentialScheduler(Scheduler): """ A scheduler that does "stupid simple sequential scheduling". That is, it simply looks at all the blocks, picks the best one, schedules it, and then moves on. """ def __init__(self, *args, **kwargs): super(SequentialScheduler, self).__init__(*args, **kwargs) def _make_schedule(self, blocks): pre_filled = np.array([[block.start_time, block.end_time] for block in self.schedule.scheduled_blocks]) if len(pre_filled) == 0: a = self.schedule.start_time filled_times = Time([a - 1*u.hour, a - 1*u.hour, a - 1*u.minute, a - 1*u.minute]) pre_filled = filled_times.reshape((2, 2)) else: filled_times = Time(pre_filled.flatten()) pre_filled = filled_times.reshape((int(len(filled_times)/2), 2)) for b in blocks: if b.constraints is None: b._all_constraints = self.constraints else: b._all_constraints = self.constraints + b.constraints # to make sure the scheduler has some constraint to work off of # and to prevent scheduling of targets below the horizon # TODO : change default constraints to [] and switch to append if b._all_constraints is None: b._all_constraints = [AltitudeConstraint(min=0 * u.deg)] b.constraints = [AltitudeConstraint(min=0 * u.deg)] elif not any(isinstance(c, AltitudeConstraint) for c in b._all_constraints): b._all_constraints.append(AltitudeConstraint(min=0 * u.deg)) if b.constraints is None: b.constraints = [AltitudeConstraint(min=0 * u.deg)] else: b.constraints.append(AltitudeConstraint(min=0 * u.deg)) b._duration_offsets = u.Quantity([0*u.second, b.duration/2, b.duration]) b.observer = self.observer current_time = self.schedule.start_time while (len(blocks) > 0) and (current_time < self.schedule.end_time): # first compute the value of all the constraints for each block # given the current starting time block_transitions = [] block_constraint_results = [] for b in blocks: # first figure out the transition if len(self.schedule.observing_blocks) > 0: trans = self.transitioner(self.schedule.observing_blocks[-1], b, current_time, self.observer) else: trans = None block_transitions.append(trans) transition_time = 0*u.second if trans is None else trans.duration times = current_time + transition_time + b._duration_offsets # make sure it isn't in a pre-filled slot if (any((current_time < filled_times) & (filled_times < times[2])) or any(abs(pre_filled.T[0]-current_time) < 1*u.second)): block_constraint_results.append(0) else: constraint_res = [] for constraint in b._all_constraints: constraint_res.append(constraint(self.observer, [b.target], times)) # take the product over all the constraints *and* times block_constraint_results.append(np.prod(constraint_res)) # now identify the block that's the best bestblock_idx = np.argmax(block_constraint_results) if block_constraint_results[bestblock_idx] == 0.: # if even the best is unobservable, we need a gap current_time += self.gap_time else: # If there's a best one that's observable, first get its transition trans = block_transitions.pop(bestblock_idx) if trans is not None: self.schedule.insert_slot(trans.start_time, trans) current_time += trans.duration # now assign the block itself times and add it to the schedule newb = blocks.pop(bestblock_idx) newb.start_time = current_time current_time += newb.duration newb.end_time = current_time newb.constraints_value = block_constraint_results[bestblock_idx] self.schedule.insert_slot(newb.start_time, newb) return self.schedule
[docs]class PriorityScheduler(Scheduler): """ A scheduler that optimizes a prioritized list. That is, it finds the best time for each ObservingBlock, in order of priority. """ def __init__(self, *args, **kwargs): """ """ super(PriorityScheduler, self).__init__(*args, **kwargs) def _make_schedule(self, blocks): # Combine individual constraints with global constraints, and # retrieve priorities from each block to define scheduling order _all_times = [] _block_priorities = np.zeros(len(blocks)) for i, b in enumerate(blocks): if b.constraints is None: b._all_constraints = self.constraints else: b._all_constraints = self.constraints + b.constraints # to make sure the scheduler has some constraint to work off of # and to prevent scheduling of targets below the horizon if b._all_constraints is None: b._all_constraints = [AltitudeConstraint(min=0 * u.deg)] b.constraints = [AltitudeConstraint(min=0 * u.deg)] elif not any(isinstance(c, AltitudeConstraint) for c in b._all_constraints): b._all_constraints.append(AltitudeConstraint(min=0 * u.deg)) if b.constraints is None: b.constraints = [AltitudeConstraint(min=0 * u.deg)] else: b.constraints.append(AltitudeConstraint(min=0 * u.deg)) b._duration_offsets = u.Quantity([0 * u.second, b.duration / 2, b.duration]) _block_priorities[i] = b.priority _all_times.append(b.duration) b.observer = self.observer # Define a master schedule # Generate grid of time slots, and a mask for previous observations time_resolution = self.time_resolution times = time_grid_from_range([self.schedule.start_time, self.schedule.end_time], time_resolution=time_resolution) is_open_time = np.ones(len(times), bool) # close times that are already filled pre_filled = np.array([[block.start_time, block.end_time] for block in self.schedule.scheduled_blocks]) for start_end in pre_filled: filled = np.where((start_end[0] < times) & (times < start_end[1])) if len(filled[0]) > 0: is_open_time[filled[0]] = False is_open_time[min(filled[0]) - 1] = False # generate the score arrays for all of the blocks scorer = Scorer(blocks, self.observer, self.schedule, global_constraints=self.constraints) score_array = scorer.create_score_array(time_resolution) # Sort the list of blocks by priority sorted_indices = np.argsort(_block_priorities) unscheduled_blocks = [] # Compute the optimal observation time in priority order for i in sorted_indices: b = blocks[i] # Compute possible observing times by combining object constraints # with the master open times mask constraint_scores = score_array[i] # Add up the applied constraints to prioritize the best blocks # And then remove any times that are already scheduled constraint_scores[is_open_time == False] = 0 # Select the most optimal time # need to leave time around the Block for transitions if self.transitioner.instrument_reconfig_times: max_config_time = sum([max(value.values()) for value in self.transitioner.instrument_reconfig_times.values()]) else: max_config_time = 0*u.second if self.transitioner.slew_rate: buffer_time = (160*u.deg/self.transitioner.slew_rate + max_config_time) else: buffer_time = max_config_time # TODO: make it so that this isn't required to prevent errors in slot creation total_duration = b.duration + buffer_time # calculate the number of time slots needed for this exposure _stride_by = np.int(np.ceil(float(total_duration / time_resolution))) # Stride the score arrays by that number _strided_scores = stride_array(constraint_scores, _stride_by) # Collapse the sub-arrays # (run them through scorekeeper again? Just add them? # If there's a zero anywhere in there, def. have to skip) good = np.all(_strided_scores > 1e-5, axis=1) sum_scores = np.zeros(len(_strided_scores)) sum_scores[good] = np.sum(_strided_scores[good], axis=1) if np.all(constraint_scores == 0) or np.all(good == False): # No further calculation if no times meet the constraints _is_scheduled = False else: # If an optimal block is available, _is_scheduled=True best_time_idx = np.argmax(sum_scores) start_time_idx = best_time_idx new_start_time = times[best_time_idx] _is_scheduled = True if _is_scheduled: # set duration such that the Block will fit in the strided array duration_indices = np.int(np.ceil(float(b.duration / time_resolution))) b.duration = duration_indices * time_resolution # add 1 second to the start time to allow for scheduling at the start of a slot slot_index = [q for q, slot in enumerate(self.schedule.slots) if slot.start < new_start_time + 1*u.second < slot.end][0] slots_before = self.schedule.slots[:slot_index] slots_after = self.schedule.slots[slot_index + 1:] # this has to remake transitions between already existing ObservingBlocks if slots_before: if isinstance(self.schedule.slots[slot_index - 1].block, ObservingBlock): # make a transition object after the previous ObservingBlock tb = self.transitioner(self.schedule.slots[slot_index - 1].block, b, self.schedule.slots[slot_index - 1].end, self.observer) # we may have a None type transition, which indicates two identical # ObservingBlocks, so no need to insert TransitionBlock if tb is not None: times_indices = np.int(np.ceil(float(tb.duration / time_resolution))) tb.duration = times_indices * time_resolution start_idx = self.schedule.slots[slot_index - 1].block.end_idx end_idx = times_indices + start_idx # this may make some OBs get sub-optimal scheduling, but it closes gaps # TODO: determine a reasonable range inside which it gets shifted if (new_start_time - tb.start_time < tb.duration or abs(new_start_time - tb.end_time) < self.gap_time): new_start_time = tb.end_time start_time_idx = end_idx self.schedule.insert_slot(tb.start_time, tb) is_open_time[start_idx: end_idx] = False slot_index += 1 # Remove times from the master time list (copied in later code blocks) elif isinstance(self.schedule.slots[slot_index - 1].block, TransitionBlock): # change the existing TransitionBlock to what it needs to be now tb = self.transitioner(self.schedule.slots[slot_index - 2].block, b, self.schedule.slots[slot_index - 2].end, self.observer) # we may have a None type transition, which indicates two identical # ObservingBlocks, so no need to insert TransitionBlock if tb is not None: times_indices = np.int(np.ceil(float(tb.duration / time_resolution))) tb.duration = times_indices * time_resolution start_idx = self.schedule.slots[slot_index - 2].block.end_idx end_idx = times_indices + start_idx self.schedule.change_slot_block(slot_index - 1, new_block=tb) if (new_start_time - tb.start_time < tb.duration or abs(new_start_time - tb.end_time) < self.gap_time): new_start_time = tb.end_time start_time_idx = end_idx is_open_time[start_idx: end_idx] = False end_time_idx = duration_indices + start_time_idx if slots_after: if isinstance(self.schedule.slots[slot_index + 1].block, ObservingBlock): # make a transition object after the new ObservingBlock tb = self.transitioner(b, self.schedule.slots[slot_index + 1].block, new_start_time + b.duration, self.observer) # we may have a None type transition, which indicates two identical # ObservingBlocks, so no need to insert TransitionBlock if tb is not None: times_indices = np.int(np.ceil(float(tb.duration / time_resolution))) tb.duration = times_indices * time_resolution self.schedule.insert_slot(tb.start_time, tb) start_idx = end_time_idx end_idx = start_idx + times_indices is_open_time[start_idx: end_idx] = False # now assign the block itself times and add it to the schedule b.constraints = b._all_constraints b.end_idx = end_time_idx self.schedule.insert_slot(new_start_time, b) is_open_time[start_time_idx: end_time_idx] = False else: print("could not schedule", b.target.name) unscheduled_blocks.append(b) continue return self.schedule
[docs]class Transitioner(object): """ A class that defines how to compute transition times from one block to another. """ u.quantity_input(slew_rate=u.deg/u.second) def __init__(self, slew_rate=None, instrument_reconfig_times=None): """ Parameters ---------- slew_rate : `~astropy.units.Quantity` with angle/time units The slew rate of the telescope instrument_reconfig_times : dict of dicts or None If not None, gives a mapping from property names to another dictionary. The second dictionary maps 2-tuples of states to the time it takes to transition between those states (as an `~astropy.units.Quantity`), can also take a 'default' key mapped to a default transition time. """ self.slew_rate = slew_rate self.instrument_reconfig_times = instrument_reconfig_times
[docs] def __call__(self, oldblock, newblock, start_time, observer): """ Determines the amount of time needed to transition from one observing block to another. This uses the parameters defined in ``self.instrument_reconfig_times``. Parameters ---------- oldblock : `~astroplan.scheduling.ObservingBlock` or None The initial configuration/target newblock : `~astroplan.scheduling.ObservingBlock` or None The new configuration/target to transition to start_time : `~astropy.time.Time` The time the transition should start observer : `astroplan.Observer` The observer at the time Returns ------- transition : `~astroplan.scheduling.TransitionBlock` or None A transition to get from ``oldblock`` to ``newblock`` or `None` if no transition is necessary """ components = {} if self.slew_rate is not None: # use the constraints cache for now, but should move that machinery # to observer from .constraints import _get_altaz from astropy.time import Time if oldblock.target != newblock.target: aaz = _get_altaz(Time([start_time]), observer, [oldblock.target, newblock.target])['altaz'] # TODO: make this [0] unnecessary by fixing _get_altaz to behave well in scalar-time case sep = aaz[0].separation(aaz[1])[0] if sep/self.slew_rate > 1 * u.second: components['slew_time'] = sep / self.slew_rate if self.instrument_reconfig_times is not None: components.update(self.compute_instrument_transitions(oldblock, newblock)) if components: return TransitionBlock(components, start_time) else: return None
[docs] def compute_instrument_transitions(self, oldblock, newblock): components = {} for conf_name, old_conf in oldblock.configuration.items(): if conf_name in newblock.configuration: conf_times = self.instrument_reconfig_times.get(conf_name, None) if conf_times is not None: new_conf = newblock.configuration[conf_name] ctime = conf_times.get((old_conf, new_conf), None) def_time = conf_times.get('default', None) if ctime is not None: s = '{0}:{1} to {2}'.format(conf_name, old_conf, new_conf) components[s] = ctime elif def_time and not old_conf == new_conf: s = '{0}:{1} to {2}'.format(conf_name, old_conf, new_conf) components[s] = def_time return components