Source code for labscheduler.dev_tools.eval_schedule

import logging
from datetime import datetime, timedelta
from typing import NamedTuple

from labscheduler.structures import JSSP, MoveOperation, Schedule

DEFAULT_ALPHA = 20

logger = logging.getLogger(__name__)


[docs] class LevelChange(NamedTuple): machine: str time: datetime change: int
[docs] def check_completeness(inst: JSSP, sol: Schedule) -> bool: return all(idx in sol for idx in inst.operations_by_id)
[docs] def check_spacial_capacities(inst: JSSP, sol: Schedule) -> bool: filling = dict.fromkeys(inst.machine_collection.machine_by_id, 0) changes = [] for idx, op in inst.operations_by_id.items(): if isinstance(op, MoveOperation): origin = sol[idx].machines_to_use["origin"] target = sol[idx].machines_to_use["target"] changes.append(LevelChange(target, sol[idx].start, 1)) end = sol[idx].start + timedelta(seconds=op.duration - 0.1) changes.append(LevelChange(origin, end, -1)) changes = sorted(changes, key=lambda ch: ch.time) # simulate through all movements and check whether spacial capacities get violated for change in changes: filling[change.machine] += change.change if filling[change.machine] > inst.machine_collection.machine_by_id[change.machine].max_capacity: logger.warning(f"capacity of {change.machine} gets exceeded.") return False return True
[docs] def check_process_capacities(inst: JSSP, sol: Schedule) -> bool: workload = dict.fromkeys(inst.machine_collection.machine_by_id, 0) changes = [] for idx, op in inst.operations_by_id.items(): executor = sol[idx].machines_to_use["main"] changes.append(LevelChange(executor, sol[idx].start, 1)) end = sol[idx].start + timedelta(seconds=op.duration - 0.1) changes.append(LevelChange(executor, end, -1)) changes = sorted(changes, key=lambda ch: ch.time) # simulate through all executions and check whether process capacities get violated for change in changes: workload[change.machine] += change.change if workload[change.machine] > inst.machine_collection.machine_by_id[change.machine].process_capacity: logger.warning(f"processing capacity of {change.machine} gets exceeded.") return False return True
WAIT_TOLERANCE = 2
[docs] def check_waiting(inst: JSSP, sol: Schedule) -> bool: for idx, op in inst.operations_by_id.items(): for idx_o in op.preceding_operations: preceding = inst.operations_by_id[idx_o] wait_time = (sol[idx].start - sol[idx_o].start).total_seconds() - preceding.duration if wait_time + WAIT_TOLERANCE < op.min_wait[idx_o]: logger.warning( f"Waiting time between {idx_o} and {idx} is {op.min_wait[idx_o] - wait_time} seconds too short", ) return False if wait_time - WAIT_TOLERANCE > op.max_wait[idx_o]: logger.warning(f"Waiting time between {idx_o} and {idx} is {wait_time - op.max_wait[idx_o]} too long") return False return True
[docs] def check_load_while_work(inst: JSSP, sol: Schedule) -> bool: return True
[docs] def is_feasible_solution(inst: JSSP, sol: Schedule | None) -> bool: """ """ try: return ( check_process_capacities(inst, sol) and check_spacial_capacities(inst, sol) and check_process_capacities(inst, sol) and check_waiting(inst, sol) and check_load_while_work(inst, sol) ) except Exception: logger.exception("Checking solution failed") return False
[docs] def objective_value(inst: JSSP, sol: Schedule, alpha: float = DEFAULT_ALPHA) -> float: """ """ start = min(sol[idx].start for idx in sol) finish_id = {idx: sol[idx].start + timedelta(seconds=op.duration) for idx, op in inst.operations_by_id.items()} finish = max(finish_id.values()) makespan = (finish - start).total_seconds() total_wait_cost = 0 for idx, op in inst.operations_by_id.items(): for idx_o in op.preceding_operations: wait_time = sol[idx].start - finish_id[idx_o] wait_cost = op.wait_cost[idx_o] * wait_time.total_seconds() total_wait_cost += wait_cost return alpha * makespan + total_wait_cost