# Generated by sila2.code_generator; sila2.__version__: 0.7.3
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
import pytz
from labscheduler.sila_server.generated.schedulingservice import (
AlgorithmMetaData,
ComputationError,
ComputeSchedule_Responses,
MissingAlgorithmError,
SchedulingServiceBase,
SelectAlgorithm_Responses,
WorkflowGraph,
)
if TYPE_CHECKING:
from sila2.server import MetadataDict, ObservableCommandInstance
from labscheduler.scheduler_interface import SchedulerInterface
from labscheduler.sila_server import Server
from labscheduler.structures import MoveOperation, Operation, RequiredMachine
[docs]
class SchedulingServiceImpl(SchedulingServiceBase):
def __init__(self, parent_server: Server, scheduler_interface: SchedulerInterface) -> None:
super().__init__(parent_server=parent_server)
self.scheduler_interface = scheduler_interface
[docs]
def get_CurrentAlgorithm(self, *, metadata: MetadataDict) -> AlgorithmMetaData:
return self.scheduler_interface.current_algorithm_info
[docs]
def SelectAlgorithm(self, AlgorithmName: str, *, metadata: MetadataDict) -> SelectAlgorithm_Responses:
found = self.scheduler_interface.select_algorithm(algorithm_name=AlgorithmName)
if not found:
raise MissingAlgorithmError
[docs]
def get_AvailableAlgorithms(self, *, metadata: MetadataDict) -> list[AlgorithmMetaData]:
return self.scheduler_interface.available_algorithms
[docs]
def ComputeSchedule(
self,
WorkflowGraph: WorkflowGraph,
MaxComputationTime: float,
*,
metadata: MetadataDict,
instance: ObservableCommandInstance,
) -> ComputeSchedule_Responses:
instance.begin_execution() # set execution status from `waiting` to `running`
operation_by_id = {}
for node in WorkflowGraph.Nodes:
idx = node.Idx
duration = node.Duration
start = None if node.StartTime == "None" else datetime.fromisoformat(node.StartTime)
finish = None if node.Finish == "None" else datetime.fromisoformat(node.Finish)
requirements = [
RequiredMachine(type=rm.Type, tag=rm.Tag, preferred=None if rm.Preferred == "None" else rm.Preferred)
for rm in node.RequiredResources
]
is_movement = "target" in [rm.tag for rm in requirements]
if is_movement:
operation = MoveOperation(
name=idx,
duration=duration,
required_machines=requirements,
start=start,
finish=finish,
)
else:
operation = Operation(
name=idx,
duration=duration,
required_machines=requirements,
start=start,
finish=finish,
)
operation_by_id[idx] = operation
if node.WaitToStartCost:
operation.wait_to_start_costs = node.WaitToStartCost
for edge in WorkflowGraph.Edges:
u = edge.Tail
v = edge.Head
operation_by_id[v].preceding_operations.append(u)
operation_by_id[v].wait_cost[u] = edge.WaitCost
operation_by_id[v].max_wait[u] = edge.MaxWaitingTime
operation_by_id[v].min_wait[u] = edge.MinWaitingTime
schedule, quality = self.scheduler_interface.compute_schedule(operation_by_id.values(), MaxComputationTime)
if schedule:
sila_schedule = [
[
name,
pytz.timezone("Europe/Berlin").localize(assign.start),
assign.machine_precedences,
[(tag, name) for tag, name in assign.machines_to_use.items()],
]
for name, assign in schedule.items()
]
else:
# No schedule could be computed. Raising a ComputationError defined execution error.
raise ComputationError
return (sila_schedule, quality.name.upper())