上一篇文章咱们介绍了 CP-SAT 的简略用法,以及 CP-SAT 蕴含的不同变量、束缚函数。实际出真知,本篇文章将用几个例子加深对 CP-SAT 应用的了解。
1. 员工排班
问题形容
医院主管须要为四名护士创立一个为期三天的时间表,但必须满足以下条件:
每天被分成三个 8 小时的班次。
每天,每个轮班调配给一名护士,没有护士的工作超过一班。
在三天的工作期间,每个护士至多要上两班。
代码及解读
from ortools.sat.python import cp_model
class NursesPartialSolutionPrinter(cp_model.CpSolverSolutionCallback):
def __init__(self, shifts, num_nurses, num_days, num_shifts, sols):
cp_model.CpSolverSolutionCallback.__init__(self)
self._shifts = shifts
self._num_nurses = num_nurses
self._num_days = num_days
self._num_shifts = num_shifts
self._solutions = set(sols)
self._solution_count = 0
#回调函数,每产生一个 solution 会调用一次回调函数。这里对回调函数进行重写。def on_solution_callback(self):
self._solution_count += 1
if self._solution_count in self._solutions:
print('Solution %i' % self._solution_count)
for d in range(self._num_days):
print('Day %i' % d)
for n in range(self._num_nurses):
is_working = False
for s in range(self._num_shifts):
if self.Value(self._shifts[(n, d, s)]):
is_working = True
print('Nurse %i works shift %i' % (n, s))
if not is_working:
print('Nurse {} does not work'.format(n))
print()
def solution_count(self):
return self._solution_count
def main():
# Data.
num_nurses = 4 # 护士数量
num_shifts = 3 # 每天班次
num_days = 3 # 一共要排多少天班
all_nurses = list(range(num_nurses))
all_shifts = list(range(num_shifts))
all_days = list(range(num_days))
# 创立 cp-sat 求解器
model = cp_model.CpModel()
# 创立 0 - 1 布尔变量变量,shifts[(n, d, s)]:示意护士 n 被调配到第 d 天的第 s 个班次
shifts = {}
for n in all_nurses:
for d in all_days:
for s in all_shifts:
shifts[(n, d, s)] = model.NewBoolVar('shift_n%id%is%i' % (n, d, s))
# 增加束缚,每个班次都必须有一名护士
for d in all_days:
for s in all_shifts:
model.Add(sum(shifts[(n, d, s)] for n in all_nurses) == 1)
# 增加束缚,每个护士一天最多只能做一个班次
for n in all_nurses:
for d in all_days:
model.Add(sum(shifts[(n, d, s)] for s in all_shifts) <= 1)
# 如何尽可能均匀地调配护士轮班,因为三天有九班,咱们能够给四名护士每人调配两班。# 之后还会有一个班次,能够调配给任何护士。# 以下代码确保每位护士在三天的工作期间至多上两班。# min_shifts_per_nurse 每个护士至多调配的班次数量
# max_shifts_per_nurse 每个护士至少调配的班次数量
min_shifts_per_nurse = (num_shifts * num_days) // num_nurses
max_shifts_per_nurse = min_shifts_per_nurse + 1
for n in all_nurses:
num_shifts_worked = sum(shifts[(n, d, s)] for d in all_days for s in all_shifts)
model.Add(min_shifts_per_nurse <= num_shifts_worked)
model.Add(num_shifts_worked <= max_shifts_per_nurse)
# 求解问题
solver = cp_model.CpSolver()
# 展现后果(只展现 5 条后果)a_few_solutions = range(5)
solution_printer = NursesPartialSolutionPrinter(shifts, num_nurses, num_days, num_shifts, a_few_solutions)
solver.SearchForAllSolutions(model, solution_printer)
# 统计分析.
print()
print('Statistics')
print('- conflicts : %i' % solver.NumConflicts())
print('- branches : %i' % solver.NumBranches())
print('- wall time : %f ms' % solver.WallTime())
print('- solutions found : %i' % solution_printer.solution_count())
if __name__ == '__main__':
main()
2. 车间调度
问题形容
作业车间中,在多台机器上解决多个作业。每个作业由一系列工作组成,这些工作必须依照给定的程序执行,并且每个工作必须在特定的机器上进行解决。例如,该工作能够是生产单个消费品,例如汽车。问题是将工作安顿在机器上,以最小化打算的长度——所有作业实现所需的工夫。
作业车间问题有几个限度:
- 在前一个作业工作实现之前,不能启动该作业的任何工作。
- 一台机器一次只能解决一项工作。
- 工作一旦启动,就必须始终运行到实现。
上面是工作车间问题一个简略的例子,每个工作由一对数字 (m,p) 示意,m 是机器的数量,p 示意工作的解决工夫。(作业和机器的编号从 0 开始。)
job 0 = [(0, 3), (1, 2), (2, 2)]
job 1 = [(0, 2), (2, 1), (1, 4)]
job 2 = [(1, 4), (2, 3)]
job 0 有三个工作,第一个 (0,3) 必须在机器 0 上以 3 个单位的工夫进行解决,第二个 (1,2) 必须在机器 1 上以 2 个单位的工夫解决,以此类推,总共有八个工作。
作业车间问题的后果是为每个任务分配一个启动工夫,该工夫满足下面给出的约束条件。下图显示了该问题的一种可能的解决方案:
问题的变量和束缚
决策变量:让 task(i, j)示意作业 i 序列中的第 j 个工作,定义为 t_i,j,问题的决策变量为特定工作 t 的开始工夫。
作业车间问题有两种类型的束缚:
优先束缚:这是因为对于同一作业中的任意两个间断工作,必须在启动第二个工作之前实现第一个工作。例如,task(0,2)和 task(0,3)是 job 0 的间断工作。因为 task(0,2)的解决工夫为 2,因而 task(0,3)的开始工夫必须在 task 2 的开始工夫之后至多为 2 个单位的工夫。后果失去如下束缚:
\(t_{0,2}+2<=t_{0,3} \)
没有重叠的束缚:这是因为机器不能同时解决两个工作。例如,task(0,2)和 task(2,1)都在机器 1 上解决。因为它们的解决工夫别离为 2 和 4,因而必须满足以下条件之一:
\(t_{0,2}+2<=t_{2,1} \),如果 task(0,2)在 task(2,1)之前被调度
\(t_{2,1}+4<=t_{0,2} \),如果 task(2,1)在 task(0,2)之前被调度
指标函数
作业车间问题的指标是最小化实现工夫:从作业的最早开始工夫到最近完结工夫的工夫长度。
代码及解读
"""Minimal jobshop example."""
import collections
from ortools.sat.python import cp_model
def main():
# 创立数据源
jobs_data = [# task = (machine_id, processing_time).
[(0, 3), (1, 2), (2, 2)], # Job0
[(0, 2), (2, 1), (1, 4)], # Job1
[(1, 4), (2, 3)] # Job2
]
#机器数
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
#机器 ID list
all_machines = range(machines_count)
#计算耗费时长的最大值
horizon = sum(task[1] for job in jobs_data for task in job)
model = cp_model.CpModel()
# 定义一个带有 'start end interval' 三个名称域的 tuple 子类
task_type = collections.namedtuple('task_type', 'start end interval')
# 定义一个带有 'start job index duration' 四个名称域的 tuple 子类
assigned_task_type = collections.namedtuple('assigned_task_type',
'start job index duration')
# 工作字典,job_id, task_id 示意 key,value 为 task_type 的实例
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
duration = task[1]
suffix = '_%i_%i' % (job_id, task_id)
# 决策变量
start_var = model.NewIntVar(0, horizon, 'start' + suffix)
end_var = model.NewIntVar(0, horizon, 'end' + suffix)
# 创立一个带有起起点窗口的窗口变量,依赖于决策变量 start_var、end_var
interval_var = model.NewIntervalVar(start_var, duration, end_var,
'interval' + suffix)
# 工作字典,job_id, task_id 示意 key,value 为 task_type 的实例
all_tasks[job_id, task_id] = task_type(start=start_var,
end=end_var,
interval=interval_var)
#将雷同机器的工作放在同一个 list 中
machine_to_intervals[machine].append(interval_var)
#束缚:机器外部不能存在工作工夫窗重合.
for machine in all_machines:
model.AddNoOverlap(machine_to_intervals[machine])
#束缚:job 内 task 程序束缚
for job_id, job in enumerate(jobs_data):
for task_id in range(len(job) - 1):
model.Add(all_tasks[job_id, task_id +
1].start >= all_tasks[job_id, task_id].end)
# 定义指标函数,这里用一个变量 + 束缚的模式定义指标函数.
obj_var = model.NewIntVar(0, horizon, 'makespan')
#束缚:obj_var 与所有工作完结工夫 list 的最大值雷同
model.AddMaxEquality(obj_var, [all_tasks[job_id, len(job) - 1].end
for job_id, job in enumerate(jobs_data)
])
model.Minimize(obj_var)
# Creates the solver and solve.
solver = cp_model.CpSolver()
status = solver.Solve(model)
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('Solution:')
# Create one list of assigned tasks per machine.
assigned_jobs = collections.defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
assigned_jobs[machine].append(
assigned_task_type(start=solver.Value(all_tasks[job_id, task_id].start),
job=job_id,
index=task_id,
duration=task[1]))
# Create per machine output lines.
output = ''
for machine in all_machines:
# Sort by starting time.
assigned_jobs[machine].sort()
sol_line_tasks = 'Machine' + str(machine) + ':'
sol_line = ' '
for assigned_task in assigned_jobs[machine]:
name = 'job_%i_task_%i' % (assigned_task.job,
assigned_task.index)
# Add spaces to output to align columns.
sol_line_tasks += '%-15s' % name
start = assigned_task.start
duration = assigned_task.duration
sol_tmp = '[%i,%i]' % (start, start + duration)
# Add spaces to output to align columns.
sol_line += '%-15s' % sol_tmp
sol_line += '\n'
sol_line_tasks += '\n'
output += sol_line_tasks
output += sol_line
# Finally print the solution found.
print(f'Optimal Schedule Length: {solver.ObjectiveValue()}')
print(output)
else:
print('No solution found.')
# Statistics.
print('\nStatistics')
print('- conflicts: %i' % solver.NumConflicts())
print('- branches : %i' % solver.NumBranches())
print('- wall time: %f s' % solver.WallTime())
if __name__ == '__main__':
main()
最终,新的调度计划如下方所示,整体耗时从 12 小时升高为 11 小时。