Source code for labscheduler.sila_server.feature_implementations.schedulingservice_impl

# Generated by sila2.code_generator; sila2.__version__: 0.7.3
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING

import pytz

from labscheduler.sila_server.generated.schedulingservice import (
    AlgorithmMetaData,
    ComputationError,
    ComputeSchedule_Responses,
    MissingAlgorithmError,
    SchedulingServiceBase,
    SelectAlgorithm_Responses,
    WorkflowGraph,
)

if TYPE_CHECKING:
    from sila2.server import MetadataDict, ObservableCommandInstance

    from labscheduler.scheduler_interface import SchedulerInterface
    from labscheduler.sila_server import Server

from labscheduler.structures import MoveOperation, Operation, RequiredMachine


[docs] class SchedulingServiceImpl(SchedulingServiceBase): def __init__(self, parent_server: Server, scheduler_interface: SchedulerInterface) -> None: super().__init__(parent_server=parent_server) self.scheduler_interface = scheduler_interface
[docs] def get_CurrentAlgorithm(self, *, metadata: MetadataDict) -> AlgorithmMetaData: return self.scheduler_interface.current_algorithm_info
[docs] def SelectAlgorithm(self, AlgorithmName: str, *, metadata: MetadataDict) -> SelectAlgorithm_Responses: found = self.scheduler_interface.select_algorithm(algorithm_name=AlgorithmName) if not found: raise MissingAlgorithmError
[docs] def get_AvailableAlgorithms(self, *, metadata: MetadataDict) -> list[AlgorithmMetaData]: return self.scheduler_interface.available_algorithms
[docs] def ComputeSchedule( self, WorkflowGraph: WorkflowGraph, MaxComputationTime: float, *, metadata: MetadataDict, instance: ObservableCommandInstance, ) -> ComputeSchedule_Responses: instance.begin_execution() # set execution status from `waiting` to `running` operation_by_id = {} for node in WorkflowGraph.Nodes: idx = node.Idx duration = node.Duration start = None if node.StartTime == "None" else datetime.fromisoformat(node.StartTime) finish = None if node.Finish == "None" else datetime.fromisoformat(node.Finish) requirements = [ RequiredMachine(type=rm.Type, tag=rm.Tag, preferred=None if rm.Preferred == "None" else rm.Preferred) for rm in node.RequiredResources ] is_movement = "target" in [rm.tag for rm in requirements] if is_movement: operation = MoveOperation( name=idx, duration=duration, required_machines=requirements, start=start, finish=finish, ) else: operation = Operation( name=idx, duration=duration, required_machines=requirements, start=start, finish=finish, ) operation_by_id[idx] = operation if node.WaitToStartCost: operation.wait_to_start_costs = node.WaitToStartCost for edge in WorkflowGraph.Edges: u = edge.Tail v = edge.Head operation_by_id[v].preceding_operations.append(u) operation_by_id[v].wait_cost[u] = edge.WaitCost operation_by_id[v].max_wait[u] = edge.MaxWaitingTime operation_by_id[v].min_wait[u] = edge.MinWaitingTime schedule, quality = self.scheduler_interface.compute_schedule(operation_by_id.values(), MaxComputationTime) if schedule: sila_schedule = [ [ name, pytz.timezone("Europe/Berlin").localize(assign.start), assign.machine_precedences, [(tag, name) for tag, name in assign.machines_to_use.items()], ] for name, assign in schedule.items() ] else: # No schedule could be computed. Raising a ComputationError defined execution error. raise ComputationError return (sila_schedule, quality.name.upper())