乐趣区

Spring-cloud-一步步实现广告系统-14-全量索引代码实现

上一节我们实现了索引基本操作的类以及索引缓存工具类,本小节我们开始实现加载全量索引数据,在加载全量索引数据之前,我们需要先将数据库中的表数据导出到一份文件中。Let’s code.

1. 首先定义一个常量类,用来存储导出文件存储的目录和文件名称

因为我们导出的文件需要在搜索服务中使用到,因此,我们将文件名 & 目录以及导出对象的信息编写在 mscx-ad-commom 项目中。

public class FileConstant {
    public static final String DATA_ROOT_DIR = "/Users/xxx/Documents/promotion/data/mysql/";

    // 各个表数据的存储文件名
    public static final String AD_PLAN = "ad_plan.data";
    public static final String AD_UNIT = "ad_unit.data";
    public static final String AD_CREATIVE = "ad_creative.data";
    public static final String AD_CREATIVE_RELARION_UNIT = "ad_creative_relation_unit.data";
    public static final String AD_UNIT_HOBBY = "ad_unit_hobby.data";
    public static final String AD_UNIT_DISTRICT = "ad_unit_district.data";
    public static final String AD_UNIT_KEYWORD = "ad_unit_keyword.data";
}

2. 定义索引对象导出的字段信息, 依然用 Ad_Plan 为例。

/**
 * AdPlanTable for 需要导出的表字段信息 => 是搜索索引字段一一对应
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初 </a>
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AdPlanTable {
    private Long planId;
    private Long userId;
    private Integer planStatus;
    private Date startDate;
    private Date endDate;
}

3. 导出文件服务实现

同样,最好的实现方式就是将导出服务作为一个子工程来独立运行,我这里直接实现在了 mscx-ad-db 项目中

  • 定义一个空接口,为了符合我们的编码规范
/**
 * IExportDataService for 导出数据库广告索引初始化数据
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初 </a>
 */
public interface IExportDataService {}
  • 实现 service
@Slf4j
@Service
public class ExportDataServiceImpl implements IExportDataService {

    @Autowired
    private AdPlanRepository planRepository;

    /**
     * 导出 {@code AdPlan} from DB to File
     *
     * @param fileName 文件名称
     */
    public void exportAdPlanTable(String fileName) {List<AdPlan> planList = planRepository.findAllByPlanStatus(CommonStatus.VALID.getStatus());
        if (CollectionUtils.isEmpty(planList)) {return;}

        List<AdPlanTable> planTables = new ArrayList<>();
        planList.forEach(item -> planTables.add(
                new AdPlanTable(item.getPlanId(),
                        item.getUserId(),
                        item.getPlanStatus(),
                        item.getStartDate(),
                        item.getEndDate())
        ));

        // 将数据写入文件
        Path path = Paths.get(fileName);
        try (BufferedWriter writer = Files.newBufferedWriter(path)) {for (AdPlanTable adPlanTable : planTables) {writer.write(JSON.toJSONString(adPlanTable));
                writer.newLine();}
            writer.close();} catch (IOException e) {e.printStackTrace();
            log.error("export AdPlanTable Exception!");
        }
    }
}
  • 实现 Controller,提供操作入口
@Slf4j
@Controller
@RequestMapping("/export")
public class ExportDataController {
    private final ExportDataServiceImpl exportDataService;

    @Autowired
    public ExportDataController(ExportDataServiceImpl exportDataService) {this.exportDataService = exportDataService;}

    @GetMapping("/export-plan")
    public CommonResponse exportAdPlans() {exportDataService.exportAdPlanTable(String.format("%s%s", FileConstant.DATA_ROOT_DIR, FileConstant.AD_PLAN));
        return new CommonResponse();}
}
  • 结果文件内容如下,每一行都代表了一个推广计划
{"endDate":1561438800000,"planId":10,"planStatus":1,"startDate":1561438800000,"userId":10}
{"endDate":1561438800000,"planId":11,"planStatus":1,"startDate":1561438800000,"userId":10}
根据文件内容构建索引

我们在之前编写索引服务的时候,创建了一些索引需要使用的实体对象类,比如构建推广计划索引的时候,需要使用到的实体对象com.sxzhongf.ad.index.adplan.AdPlanIndexObject,可是呢,我们在上一节实现索引导出的时候,实体对象又是 common 包中的com.sxzhongf.ad.common.export.table.AdPlanTable,读取出来文件中的数据只能反序列化为JSON.parseObject(p, AdPlanTable.class),我们需要将 2 个对象做相互映射才能创建索引信息。

1. 首先我们定义一个操作类型枚举,代表我们每一次的操作类型(也需要对应到后期 binlog 监听的操作类型

public enum OperationTypeEnum {
    ADD,
    UPDATE,
    DELETE,
    OTHER;

    public static OperationTypeEnum convert(EventType type) {switch (type) {
            case EXT_WRITE_ROWS:
                return ADD;
            case EXT_UPDATE_ROWS:
                return UPDATE;
            case EXT_DELETE_ROWS:
                return DELETE;
            default:
                return OTHER;
        }
    }
}

2. 因为全量索引的加载和增量索引加载的本质是一样的,全量索引其实就是一种特殊的增量索引,为了代码的可复用,我们创建统一的类来操作索引。

/**
 * AdLevelDataHandler for 通用处理索引类
 * 1. 索引之间存在层级划分,也就是相互之间拥有依赖关系的划分
 * 2. 加载全量索引其实是增量索引 "添加" 的一种特殊实现
 *
 * @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初 </a>
 */
@Slf4j
public class AdLevelDataHandler {

    /**
     * 实现广告推广计划的第二层级索引实现。*(第一级为用户层级,但是用户层级不参与索引,所以从 level 2 开始)* 第二层级的索引是表示 不依赖于其他索引,但是可被其他索引所依赖
     */
    public static void handleLevel2Index(AdPlanTable adPlanTable, OperationTypeEnum type) {
          // 对象转换
        AdPlanIndexObject planIndexObject = new AdPlanIndexObject(adPlanTable.getPlanId(),
                adPlanTable.getUserId(),
                adPlanTable.getPlanStatus(),
                adPlanTable.getStartDate(),
                adPlanTable.getEndDate());

        // 调用通用方法处理,使用 IndexDataTableUtils#of 来获取索引的实现类 bean
        handleBinlogEvent(
                      // 在前一节我们实现了一个索引工具类,来获取注入的 bean 对象
                IndexDataTableUtils.of(AdPlanIndexAwareImpl.class),
                planIndexObject.getPlanId(),
                planIndexObject,
                type
        );
    }

    /**
     * 处理全量索引和增量索引的通用处理方式
     * K,V 代表索引的键和值
     *
     * @param index 索引实现代理类父级
     * @param key   键
     * @param value 值
     * @param type  操作类型
     */
    private static <K, V> void handleBinlogEvent(IIndexAware<K, V> index, K key, V value, OperationTypeEnum type) {switch (type) {
            case ADD:
                index.add(key, value);
                break;
            case UPDATE:
                index.update(key, value);
                break;
            case DELETE:
                index.delete(key, value);
                break;
            default:
                break;
        }
    }
}

3. 读取文件实现全量索引加载。

因为我们文件加载之前需要依赖另一个组件,也就是我们的索引工具类,需要添加上 @DependsOn("indexDataTableUtils"),全量索引在系统启动的时候就需要加载,我们需要添加@PostConstruct 来实现初始化加载,被 @PostConstruct 修饰的方法会在服务器加载 Servlet 的时候运行,并且只会被服务器调用一次。

@Component
@DependsOn("indexDataTableUtils")
public class IndexFileLoader {

    /**
     * 服务启动时,执行全量索引加载
     */
    @PostConstruct
    public void init() {
        // 加载 推广计划
        List<String> adPlanStrings = loadExportedData(String.format("%s%s",
                FileConstant.DATA_ROOT_DIR, FileConstant.AD_PLAN
        ));
        adPlanStrings.forEach(p -> AdLevelDataHandler.handleLevel2Index(JSON.parseObject(p, AdPlanTable.class), OperationTypeEnum.ADD
        ));
    }

    /**
     * <h3> 读取全量索引加载需要的文件 </h3>
     *
     * @param fileName 文件名称
     * @return 文件行数据
     */
    private List<String> loadExportedData(String fileName) {try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {return reader.lines().collect(Collectors.toList());
        } catch (IOException e) {throw new RuntimeException(e.getMessage());
        }
    }
}

Tips

在实现初始化加载全量索引的过程中,一定要保证数据加载的顺序问题,因为不同的数据有可能存在着相互依赖的关联关系,一旦顺序写错,会造成程序报错问题。

退出移动版