上一篇文章咱们介绍了CP-SAT的简略用法,以及CP-SAT蕴含的不同变量、束缚函数。实际出真知,本篇文章将用几个例子加深对CP-SAT应用的了解。

1.员工排班

问题形容

医院主管须要为四名护士创立一个为期三天的时间表,但必须满足以下条件:

每天被分成三个8小时的班次。
每天,每个轮班调配给一名护士,没有护士的工作超过一班。
在三天的工作期间,每个护士至多要上两班。

代码及解读

from ortools.sat.python import cp_modelclass NursesPartialSolutionPrinter(cp_model.CpSolverSolutionCallback):    def __init__(self, shifts, num_nurses, num_days, num_shifts, sols):        cp_model.CpSolverSolutionCallback.__init__(self)        self._shifts = shifts        self._num_nurses = num_nurses        self._num_days = num_days        self._num_shifts = num_shifts        self._solutions = set(sols)        self._solution_count = 0    #回调函数,每产生一个solution会调用一次回调函数。这里对回调函数进行重写。    def on_solution_callback(self):        self._solution_count += 1        if self._solution_count in self._solutions:            print('Solution %i' % self._solution_count)            for d in range(self._num_days):                print('Day %i' % d)                for n in range(self._num_nurses):                    is_working = False                    for s in range(self._num_shifts):                        if self.Value(self._shifts[(n, d, s)]):                            is_working = True                            print('  Nurse %i works shift %i' % (n, s))                    if not is_working:                        print('  Nurse {} does not work'.format(n))            print()    def solution_count(self):        return self._solution_countdef main():    # Data.    num_nurses = 4    # 护士数量    num_shifts = 3    # 每天班次    num_days = 3      # 一共要排多少天班    all_nurses = list(range(num_nurses))    all_shifts = list(range(num_shifts))    all_days = list(range(num_days))    # 创立cp-sat求解器    model = cp_model.CpModel()    # 创立0-1布尔变量变量,shifts[(n, d, s)]:示意护士n被调配到第d天的第s个班次    shifts = {}    for n in all_nurses:        for d in all_days:            for s in all_shifts:                shifts[(n, d, s)] = model.NewBoolVar('shift_n%id%is%i' % (n, d, s))    # 增加束缚,每个班次都必须有一名护士    for d in all_days:        for s in all_shifts:            model.Add(sum(shifts[(n, d, s)] for n in all_nurses) == 1)    # 增加束缚,每个护士一天最多只能做一个班次    for n in all_nurses:        for d in all_days:            model.Add(sum(shifts[(n, d, s)] for s in all_shifts) <= 1)    # 如何尽可能均匀地调配护士轮班,因为三天有九班,咱们能够给四名护士每人调配两班。    # 之后还会有一个班次,能够调配给任何护士。    # 以下代码确保每位护士在三天的工作期间至多上两班。    # min_shifts_per_nurse 每个护士至多调配的班次数量    # max_shifts_per_nurse 每个护士至少调配的班次数量    min_shifts_per_nurse = (num_shifts * num_days) // num_nurses    max_shifts_per_nurse = min_shifts_per_nurse + 1    for n in all_nurses:        num_shifts_worked = sum(            shifts[(n, d, s)] for d in all_days for s in all_shifts)        model.Add(min_shifts_per_nurse <= num_shifts_worked)        model.Add(num_shifts_worked <= max_shifts_per_nurse)    # 求解问题    solver = cp_model.CpSolver()    # 展现后果(只展现5条后果)    a_few_solutions = range(5)    solution_printer = NursesPartialSolutionPrinter(        shifts, num_nurses, num_days, num_shifts, a_few_solutions)    solver.SearchForAllSolutions(model, solution_printer)    # 统计分析.    print()    print('Statistics')    print('  - conflicts       : %i' % solver.NumConflicts())    print('  - branches        : %i' % solver.NumBranches())    print('  - wall time       : %f ms' % solver.WallTime())    print('  - solutions found : %i' % solution_printer.solution_count())if __name__ == '__main__':    main()

2.车间调度

问题形容

作业车间中,在多台机器上解决多个作业。每个作业由一系列工作组成,这些工作必须依照给定的程序执行,并且每个工作必须在特定的机器上进行解决。例如,该工作能够是生产单个消费品,例如汽车。问题是将工作安顿在机器上,以最小化打算的长度——所有作业实现所需的工夫。

作业车间问题有几个限度:

  • 在前一个作业工作实现之前,不能启动该作业的任何工作。
  • 一台机器一次只能解决一项工作。
  • 工作一旦启动,就必须始终运行到实现。

上面是工作车间问题一个简略的例子,每个工作由一对数字(m,p)示意,m是机器的数量,p示意工作的解决工夫。(作业和机器的编号从0开始。)

job 0 = [(0, 3), (1, 2), (2, 2)]
job 1 = [(0, 2), (2, 1), (1, 4)]
job 2 = [(1, 4), (2, 3)]

job 0有三个工作,第一个(0,3)必须在机器0上以3个单位的工夫进行解决,第二个(1,2)必须在机器1上以2个单位的工夫解决,以此类推,总共有八个工作。

作业车间问题的后果是为每个任务分配一个启动工夫,该工夫满足下面给出的约束条件。下图显示了该问题的一种可能的解决方案:

问题的变量和束缚

决策变量:让task(i, j)示意作业i序列中的第j个工作,定义为t_i,j,问题的决策变量为特定工作t的开始工夫。

作业车间问题有两种类型的束缚:

优先束缚:这是因为对于同一作业中的任意两个间断工作,必须在启动第二个工作之前实现第一个工作。例如,task(0,2)和task(0,3)是job 0的间断工作。因为task(0,2)的解决工夫为2,因而task(0,3)的开始工夫必须在task 2的开始工夫之后至多为2个单位的工夫。后果失去如下束缚:

\( t_{0,2}+2<=t_{0,3} \)

没有重叠的束缚:这是因为机器不能同时解决两个工作。例如,task(0,2)和task(2,1)都在机器1上解决。因为它们的解决工夫别离为2和4,因而必须满足以下条件之一:

\( t_{0,2}+2<=t_{2,1} \) ,如果task(0,2)在task(2,1)之前被调度

\( t_{2,1}+4<=t_{0,2} \) ,如果task(2,1)在task(0,2)之前被调度

指标函数

作业车间问题的指标是最小化实现工夫:从作业的最早开始工夫到最近完结工夫的工夫长度。

代码及解读

"""Minimal jobshop example."""import collectionsfrom ortools.sat.python import cp_modeldef main():    # 创立数据源    jobs_data = [  # task = (machine_id, processing_time).        [(0, 3), (1, 2), (2, 2)],  # Job0        [(0, 2), (2, 1), (1, 4)],  # Job1        [(1, 4), (2, 3)]  # Job2    ]    #机器数    machines_count = 1 + max(task[0] for job in jobs_data for task in job)    #机器ID list    all_machines = range(machines_count)    #计算耗费时长的最大值    horizon = sum(task[1] for job in jobs_data for task in job)    model = cp_model.CpModel()    # 定义一个带有'start end interval'三个名称域的tuple子类    task_type = collections.namedtuple('task_type', 'start end interval')    # 定义一个带有'start job index duration'四个名称域的tuple子类    assigned_task_type = collections.namedtuple('assigned_task_type',                                                'start job index duration')    # 工作字典,job_id, task_id示意key,value为task_type的实例    all_tasks = {}    machine_to_intervals = collections.defaultdict(list)    for job_id, job in enumerate(jobs_data):        for task_id, task in enumerate(job):            machine = task[0]            duration = task[1]            suffix = '_%i_%i' % (job_id, task_id)            # 决策变量            start_var = model.NewIntVar(0, horizon, 'start' + suffix)            end_var = model.NewIntVar(0, horizon, 'end' + suffix)            # 创立一个带有起起点窗口的窗口变量,依赖于决策变量start_var、end_var            interval_var = model.NewIntervalVar(start_var, duration, end_var,                                                'interval' + suffix)            # 工作字典,job_id, task_id示意key,value为task_type的实例            all_tasks[job_id, task_id] = task_type(start=start_var,                                                   end=end_var,                                                   interval=interval_var)            #将雷同机器的工作放在同一个list中            machine_to_intervals[machine].append(interval_var)    #束缚:机器外部不能存在工作工夫窗重合.    for machine in all_machines:        model.AddNoOverlap(machine_to_intervals[machine])    #束缚:job内task程序束缚    for job_id, job in enumerate(jobs_data):        for task_id in range(len(job) - 1):            model.Add(all_tasks[job_id, task_id +                                1].start >= all_tasks[job_id, task_id].end)    # 定义指标函数,这里用一个变量+束缚的模式定义指标函数.    obj_var = model.NewIntVar(0, horizon, 'makespan')    #束缚:obj_var与所有工作完结工夫list的最大值雷同    model.AddMaxEquality(obj_var, [        all_tasks[job_id, len(job) - 1].end        for job_id, job in enumerate(jobs_data)    ])    model.Minimize(obj_var)    # Creates the solver and solve.    solver = cp_model.CpSolver()    status = solver.Solve(model)    if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:        print('Solution:')        # Create one list of assigned tasks per machine.        assigned_jobs = collections.defaultdict(list)        for job_id, job in enumerate(jobs_data):            for task_id, task in enumerate(job):                machine = task[0]                assigned_jobs[machine].append(                    assigned_task_type(start=solver.Value(                        all_tasks[job_id, task_id].start),                                       job=job_id,                                       index=task_id,                                       duration=task[1]))        # Create per machine output lines.        output = ''        for machine in all_machines:            # Sort by starting time.            assigned_jobs[machine].sort()            sol_line_tasks = 'Machine ' + str(machine) + ': '            sol_line = '           '            for assigned_task in assigned_jobs[machine]:                name = 'job_%i_task_%i' % (assigned_task.job,                                           assigned_task.index)                # Add spaces to output to align columns.                sol_line_tasks += '%-15s' % name                start = assigned_task.start                duration = assigned_task.duration                sol_tmp = '[%i,%i]' % (start, start + duration)                # Add spaces to output to align columns.                sol_line += '%-15s' % sol_tmp            sol_line += '\n'            sol_line_tasks += '\n'            output += sol_line_tasks            output += sol_line        # Finally print the solution found.        print(f'Optimal Schedule Length: {solver.ObjectiveValue()}')        print(output)    else:        print('No solution found.')    # Statistics.    print('\nStatistics')    print('  - conflicts: %i' % solver.NumConflicts())    print('  - branches : %i' % solver.NumBranches())    print('  - wall time: %f s' % solver.WallTime())if __name__ == '__main__':    main()

最终,新的调度计划如下方所示,整体耗时从12小时升高为11小时。