"""
Here we define the structures of a process, a scheduling instance etc.
"""
from collections.abc import Iterable
from copy import deepcopy
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import NamedTuple
import networkx as nx
from labscheduler.logging_manager import scheduler_logger as logger
[docs]
@dataclass
class Machine:
"""Container for a machine.
Args:
name: The name of the machine.
type: The type of the machine.
max_capacity: The maximum number of labware this machine can hold.
min_capacity: The minimum number of labware, this machine has to hold to be operational
process_capacity: The maximum number of operations this machine can do at a time.
allows_overlap: Whether operations, that have overlapping but not identical execution_time are allowed.
"""
name: str
type: str
max_capacity: int
min_capacity: int = 1
process_capacity: int = 1
allows_overlap: bool = False
[docs]
class RequiredMachine(NamedTuple):
"""Container for a requirement of an operation"""
type: str
tag: str
preferred: str | None = None
[docs]
@dataclass
class Operation:
"""Editable data collection for a general operations in a workflow."""
name: str # important for referencing.
duration: float # duration of the operation in seconds
start: datetime | None = None
finish: datetime | None = None
preceding_operations: list[str] = field(default_factory=list) # list of the names of operations required for this
required_machines: list[RequiredMachine] = field(default_factory=list)
wait_cost: dict[str, float] = field(default_factory=dict) # waiting costs after prior operations(linked by name)
max_wait: dict[str, int] = field(default_factory=dict) # maximum waiting times after prior operations
min_wait: dict[str, int] = field(default_factory=dict) # minimum waiting times after prior operations
wait_to_start_costs: float = 0.5 # for numeric reasons we do not want it to be 0
@property
def main_machine(self) -> RequiredMachine | None:
for used_machine in self.required_machines:
if used_machine.tag == "main":
return used_machine
return None
[docs]
@dataclass
class MoveOperation(Operation):
"""Operation resembling a movement of a container"""
pref_dest_pos: int | None = None # optional preferences for the destination slot number
destination_pos: int = 0 # actual destination slot number (set at execution runtime)
origin_pos: int = 0 # this should not be relevant for the execution but might be nice for logging
@property
def origin_machine(self) -> RequiredMachine | None:
for used_machine in self.required_machines:
if used_machine.tag == "origin":
return used_machine
return None
@property
def target_machine(self) -> RequiredMachine | None:
for used_machine in self.required_machines:
if used_machine.tag == "target":
return used_machine
return None
[docs]
@dataclass
class ScheduledAssignment:
"""
Every operation is assigned this data in a schedule
"""
start: datetime # scheduled start of the operation
# List of operations scheduled involving the same machine(s), that have to finish prior
machine_precedences: list[str] = field(default_factory=list)
# participating are assigned by their tag in used_machines. e.g.: 'target'->'Carousel' or 'main'->'F5'
machines_to_use: dict[str, str] = field(default_factory=dict)
Schedule = dict[str, ScheduledAssignment]
[docs]
class MachineCollection:
"""
This class is defined by just the list of available machines, but provides certain utilities
"""
machine_by_id: dict[str, Machine]
machine_class_sizes: dict[str, int]
machine_by_class: dict[str, list[Machine]]
n_machine_classes: int
n_machines: int
min_capacity_machines: dict[str, Machine]
dump: Machine
def __init__(self, machines: list[Machine]):
# this avoids changing the original job shop (,i.e., adding a dummy)
machines = deepcopy(machines)
# create a dumping place so the schedule is for cut off workflows is still feasible
self.dump = Machine(
name="DummyDump",
type="Dump",
max_capacity=999,
process_capacity=999,
allows_overlap=True,
)
machines.append(self.dump)
# create some convenience dictionaries
self.machine_by_id = {m.name: m for m in machines}
machine_classes = {m.type for m in machines}
self.machines_by_class = {cls: [m for m in machines if m.type == cls] for cls in machine_classes}
self.n_machine_classes = len(machine_classes)
self.n_machines = len(machines)
self.machine_class_sizes = {cls: len(self.machines_by_class[cls]) for cls in machine_classes}
self.min_capacity_machines = {m.name: m for m in machines if m.min_capacity > 1}
[docs]
class JSSP:
"""
An instance of a special operation shop problem
"""
operations_by_id: dict[str, Operation]
machine_collection: MachineCollection
_dummys: list[str]
def __init__(self, operations: Iterable[Operation], machines: list[Machine]):
self.machine_collection = MachineCollection(machines)
self.operations_by_id = {op.name: op for op in operations}
self._dummys = []
self._wfg = None
[docs]
def is_dummy(self, op: Operation) -> bool:
"""
Utilitxy function to check whether an operation is an artificial node (dummy)
:param op:
:return:
"""
return op.name in self._dummys
[docs]
def add_dummys(self):
"""
Utility function adding dummy nodes to ensure solvability of truncated workflows
"""
if not self.machine_collection.machines_by_class["MoverServiceResource"]:
# no movers means no dummy movements
return
# detect all last operations
all_priors = set()
for op in self.operations_by_id.values():
all_priors.update(op.preceding_operations)
last_operations = list(filter(lambda idx: idx not in all_priors, self.operations_by_id))
# filter for those last operations leaving ending in places of limited space
critical_last_op = []
moveable_objects = len(last_operations) # TODO How to I really extract this number from the problem???
for idx in last_operations:
op = self.operations_by_id[idx]
if isinstance(op, MoveOperation):
ending_place = op.target_machine
else:
ending_place = op.main_machine
# get the spacial capacity if the executors or the minimum of all possile executors
if ending_place.preferred:
capacity = self.machine_collection.machine_by_id[ending_place.preferred].max_capacity
else:
possible_executors = self.machine_collection.machines_by_class[ending_place.type]
capacity = min(machine.max_capacity for machine in possible_executors)
if capacity < moveable_objects:
critical_last_op.append(op)
logger.debug(f"adding {len(last_operations)} dummys to {last_operations}")
for i, op in enumerate(critical_last_op):
if isinstance(op, MoveOperation):
last_place = op.target_machine
else:
last_place = op.main_machine
dump = self.machine_collection.dump
mover_name = self.machine_collection.machines_by_class["MoverServiceResource"][0].name
move_to_dump = MoveOperation(
name=f"move_{i}_to_dump",
preceding_operations=[op.name],
duration=50,
wait_cost={op.name: 1},
min_wait={op.name: 0},
max_wait={op.name: 2**31},
required_machines=[
RequiredMachine(last_place.type, tag="origin", preferred=last_place.preferred),
RequiredMachine(type="MoverServiceResource", tag="main", preferred=mover_name),
RequiredMachine(type=dump.type, tag="target", preferred=dump.name),
],
)
self.operations_by_id[move_to_dump.name] = move_to_dump
self._dummys.append(move_to_dump.name)
[docs]
def remove_dummys(self, schedule: Schedule):
"""
Utility function removing the dummy nodes which are created to ensure solveability of truncated workflows
:param schedule:
"""
for dummy in self._dummys:
if dummy in schedule:
schedule.pop(dummy)
[docs]
def start_operation_ids(self) -> list[str]:
"""
Utility function to get all operations that have no precedence constraints
:return:
"""
return [idx for idx, op in self.operations_by_id.items() if not op.preceding_operations]
[docs]
def start_occupations(self) -> dict[str, int]:
"""
Utility function to extract the initial occupation of all machines before the processes start
"""
occupation = dict.fromkeys(self.machine_collection.machine_by_id, 0)
for start_id in self.start_operation_ids():
start_op = self.operations_by_id[start_id]
if isinstance(start_op, MoveOperation):
start_machine = start_op.origin_machine
else:
start_machine = start_op.main_machine
# TODO this does not find those containers that have an operation in their starting machine and counts
# double reagents that have multiple parallel starting moves
if start_machine.preferred:
occupation[start_machine.preferred] += 0 # TODO change this 0 to 1 when the rest works
return occupation
[docs]
def create_wfg(self) -> nx.DiGraph:
"""
Utility function creating a networkx graph of the problems workflow
:return: A directed graph of type networkx.DiGraph
"""
g = nx.DiGraph()
for idx, op in self.operations_by_id.items():
g.add_node(idx)
for prior in op.preceding_operations:
g.add_edge(prior, idx)
return g
@property
def wfg(self) -> nx.DiGraph:
if not self._wfg:
self._wfg = self.create_wfg()
return self._wfg
[docs]
class SolutionQuality(Enum): # FIXME: add some obvious output when schedule is stuck.
OPTIMAL = 1
FEASIBLE = 2
INFEASIBLE = 3