Source code for labscheduler.solvers.cp_solver
"""
A solver implementation using google OR-tools to model and solve the JSSP as a constraint program(CP)
"""
import contextlib
import importlib.metadata
import importlib.util
import pickle
import re
import subprocess
from pathlib import Path
from threading import Thread
from labscheduler.logging_manager import scheduler_logger
from labscheduler.solver_interface import AlgorithmInfo, JSSPSolver
from labscheduler.structures import (
JSSP,
Schedule,
SolutionQuality,
)
if not importlib.util.find_spec("ortools"):
msg = (
"The required optional dependency 'ortools' is not installed. Please install it using "
"'pip install .[cpsolver]' to use the CP solver."
)
raise ModuleNotFoundError(msg)
# Check ortools version. If its too low, the solver will not work
ortools_version = importlib.metadata.version("ortools")
ortools_version_parts = []
for part in ortools_version.split(".")[:3]:
match = re.match(r"\d+", part)
ortools_version_parts.append(int(match.group()) if match else 0)
ortools_version_tuple = tuple(ortools_version_parts + [0] * (3 - len(ortools_version_parts)))
if ortools_version_tuple < (6, 0, 0):
msg = f"The CP solver requires ortools>=6.0.0, but ortools=={ortools_version} is installed."
raise ImportError(msg)
[docs]
class CPSolver(JSSPSolver):
[docs]
def compute_schedule(
self,
inst: JSSP,
time_limit: float,
offset: float,
**kwargs,
) -> tuple[Schedule | None, SolutionQuality]:
problem_data = {"inst": inst, "time_limit": time_limit, "offset": offset, **kwargs}
# Get the path to cp_worker.py relative to this module
worker_path = Path(__file__).parent / "cp_worker.py"
proc = subprocess.Popen( # noqa: S603
["python", str(worker_path)], # noqa: S607
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Send pickled problem
pickle.dump(problem_data, proc.stdin)
proc.stdin.close()
result = {}
# Try to receive the pickled result
def reader():
with contextlib.suppress(Exception):
result.update(pickle.load(proc.stdout)) # noqa: S301
# give the process generous (1 extra second) time to produce a result
t = Thread(target=reader, daemon=True)
t.start()
t.join(time_limit + 1)
if t.is_alive():
# timeout reached
proc.kill()
proc.wait()
# get the result
try:
quality = result.get("quality")
schedule = result.get("schedule")
except KeyError:
schedule = None
quality = SolutionQuality.INFEASIBLE
# log stderr for debugging
err = proc.stderr.read()
if err:
scheduler_logger.error(f"Worker stderr:{err.decode('utf-8', errors='replace')}")
return schedule, quality
[docs]
@staticmethod
def get_algorithm_info() -> AlgorithmInfo:
return AlgorithmInfo(
name="CP-Solver",
is_optimal=True,
success_guaranty=True,
max_problem_size=300,
)
[docs]
def is_solvable(self, inst: JSSP) -> bool:
# model, vars_ = self.create_model(inst, 0) # noqa: ERA001
# TODO: there is a method in OR-tools to check solvability
# state = self.solve_cp(model, vars_, 5) # noqa: ERA001
# return state in {OPTIMAL, FEASIBLE} # noqa: ERA001
return True