ODI 操作详解
一致性 CDC(Last Update Date)同步数据
添加 CDC
1. 检查数据库版本
DECLARE -- Validating KM options
dbVersion1 NUMBER(2,0);
dbVersion2 NUMBER(2,0);
BEGIN
-- Verify RDBMS version
select
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, 1, instr(value, '.',1,1)-1)) else null end v0,
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, instr(value, '.',1,1)+1, instr(value, '.',1,2)-instr(value, '.',1,1)-1)) else null end v1
into dbVersion1, dbVersion2
from v$parameter
where upper(name) = 'COMPATIBLE';
IF dbVersion1 < 9 THEN
raise_application_error(-20101, 'ODIKM-ORA-10000: You are using RDBMS version'||dbVersion1||'.'||dbVersion2||'. This version does not support JKM COMPATIBLE= 9.');
END IF;
END;
2. 获取连接
import java.sql as sql
import java.lang as lang
# Connect to the Source Database ie the one that hosts the journalized tables
myCon = odiRef.getJDBCConnection("SRC")
# Create the list of Subscribers for a CDC Set
lstSbs = []
lstSbs.append('Weichao')
3. 删除 SNP_CDC_SUBS 中的 CDC 值
delCmd = """
delete from FUSION.SNP_CDC_SUBS CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and CDC.CDC_SUBSCRIBER = ?
"""
# Prepare the delete
prepDelStmt = myCon.prepareStatement(delCmd)
# Execute it for each subscriber
for sbs in lstSbs:
prepDelStmt.setString(1, sbs)
nbRows = prepDelStmt.executeUpdate()
prepDelStmt.close()
# Commit the updates
myCon.commit()
4. 每次更改此表的内容后,我们都会重新计算统计信息
/* After each change on the content of this table we recompute the statistics */
/* Statistics are always up to date on this table */
begin
dbms_stats.gather_table_stats(
ownname => 'FUSION',
tabname => 'SNP_CDC_SUBS',
estimate_percent => dbms_stats.auto_sample_size
);
end;
5. 关闭连接
myCon.close()
添加订阅
1. 检查数据库版本
DECLARE -- Validating KM options
dbVersion1 NUMBER(2,0);
dbVersion2 NUMBER(2,0);
BEGIN
-- Verify RDBMS version
select
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, 1, instr(value, '.',1,1)-1)) else null end v0,
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, instr(value, '.',1,1)+1, instr(value, '.',1,2)-instr(value, '.',1,1)-1)) else null end v1
into dbVersion1, dbVersion2
from v$parameter
where upper(name) = 'COMPATIBLE';
IF dbVersion1 < 9 THEN
raise_application_error(-20101, 'ODIKM-ORA-10000: You are using RDBMS version'||dbVersion1||'.'||dbVersion2||'. This version does not support JKM COMPATIBLE= 9.');
END IF;
END;
2. 设置参数,获取数据库连接
import java.sql as sql
import java.lang as lang
# Connect to the Source Database ie the one that hosts the journalized tables
myCon = odiRef.getJDBCConnection("SRC")
# Create the list of Subscribers for a CDC Set
lstSbs = []
lstSbs.append('Zero')
3. 创建表 SNP_CDC_SUBS,由于已经创建,会报出异常
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_SUBS
(CDC_SET_NAME VARCHAR2(255 CHAR) not null, /* Name of the CDC Set (ODI model code following the Local Mask syntax) */
CDC_SUBSCRIBER VARCHAR2(400 CHAR) not null, /* Name of the subscriber who subscribed to the CDC Set */
CDC_REFDATE DATE not null, /* Last update of the record */
MIN_WINDOW_ID NUMBER(20) not null, /* Window Ids under this one should be ignored */
MAX_WINDOW_ID NUMBER(20) not null, /* Maximum Window Id used by this subscription */
MAX_WINDOW_ID_DEL NUMBER(20) not null, /* Maximum Window Id to take into consideration when looking at consistency for deletes */
MAX_WINDOW_ID_INS NUMBER(20) not null, /* Maximum Window Id to take into consideration when looking at consistency for inserts / updates */
CDC_ROW_COUNT NUMBER(10) not null, /* Number of rows in the journalizing tables for this subscription */
constraint PK_SNP_CDC_SBS primary key (CDC_SET_NAME, CDC_SUBSCRIBER)
)
JKM 具体执行过程如下,勾选了忽略错误,所以上面的创建报错忽略
4. 设置字段长度
alter table FUSION.SNP_CDC_SUBS modify (CDC_SUBSCRIBER VARCHAR2(400 CHAR))
5. 设置字段长度 表不存在,会报错
alter table FUSION.SNP_CDC_SUBS_AE modify (CDC_SUBSCRIBER VARCHAR2(400 CHAR))
6. 创建 CDC_SET 表 表已存在,会报错
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_SET
(CDC_SET_NAME VARCHAR2(255 CHAR) not null, /* Name of the CDC Set (ODI model code following the Local Mask syntax) */
CUR_WINDOW_ID NUMBER(20) not null, /* Last window Id that has been used */
CUR_WINDOW_ID_DEL NUMBER(20) not null, /* Last Window Id used to compute delete consitency */
CUR_WINDOW_ID_INS NUMBER(20) not null, /* Last Window Id used to compute inert / update consitency */
RETRIEVE_DATA VARCHAR2(2000 CHAR) null, /* Command to execute in order to retrieve the journal data (used by the OdiRetrieveJournalData tool) */
REFRESH_ROW_COUNT VARCHAR2(2000 CHAR) null, /* Command to execute in order to refresh the row count(used by the OdiRefreshJournalData tool) */
constraint PK_SNP_CDC_SET primary key (CDC_SET_NAME)
)
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_SET
(CDC_SET_NAME VARCHAR2(255 CHAR) not null, /* Name of the CDC Set (ODI model code following the Local Mask syntax) */
CUR_WINDOW_ID NUMBER(20) not null, /* Last window Id that has been used */
CUR_WINDOW_ID_DEL NUMBER(20) not null, /* Last Window Id used to compute delete consitency */
CUR_WINDOW_ID_INS NUMBER(20) not null, /* Last Window Id used to compute inert / update consitency */
RETRIEVE_DATA VARCHAR2(2000 CHAR) null, /* Command to execute in order to retrieve the journal data (used by the OdiRetrieveJournalData tool) */
REFRESH_ROW_COUNT VARCHAR2(2000 CHAR) null, /* Command to execute in order to refresh the row count(used by the OdiRefreshJournalData tool) */
constraint PK_SNP_CDC_SET primary key (CDC_SET_NAME)
)
7. 创建表 SNP_CDC_SET_TABLE 已存在,会报错
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_SET_TABLE
(CDC_SET_NAME VARCHAR2(255 CHAR) not null, /* Name of the CDC Set (ODI model code following the Local Mask syntax) */
FULL_TABLE_NAME VARCHAR2(255 CHAR) not null, /* Complete name of the journalized table */
FULL_DATA_VIEW VARCHAR2(255 CHAR) not null, /* Complete name of the journalizing data view */
RETRIEVE_DATA VARCHAR2(2000 CHAR) null, /* Command to execute to retrieve the journal data (used by the OdiRetrieveJournalData tool) */
REFRESH_ROW_COUNT VARCHAR2(2000 CHAR) null, /* Command to execute to refresh the row count (used by the OdiRefreshRowCount tool) */
CDC_LAST_DATE TIMESTAMP,
CDC_CURRENT_DATE TIMESTAMP,
constraint PK_SNP_CDC_TBL primary key (FULL_TABLE_NAME)
)
8.SNP_CDC_SET_TABLE 添加列 已添加,会报错
alter table FUSION.SNP_CDC_SET_TABLE add
(
CDC_LAST_DATE TIMESTAMP,
CDC_CURRENT_DATE TIMESTAMP
)
9. 创建表 SNP_CDC_OBJECTS 表已创建,会报错
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_OBJECTS
(FULL_TABLE_NAME VARCHAR2(255 CHAR) not null, /* Complete name of the journalized table */
CDC_OBJECT_TYPE VARCHAR2(35 CHAR) not null, /* Type of the object in the CDC framework */
FULL_OBJECT_NAME VARCHAR2(255 CHAR) not null, /* Complete name of the object */
DB_OBJECT_TYPE VARCHAR2(35 CHAR) null, /* RDBMS type of the object (ex : TABLE, VIEW, TRIGGER) */
constraint PK_SNP_CDC_OBJ primary key (FULL_TABLE_NAME, CDC_OBJECT_TYPE)
)
10. 向 CDC_SET 表中添加
insert into FUSION.SNP_CDC_SET
(
CDC_SET_NAME,
CUR_WINDOW_ID,
CUR_WINDOW_ID_DEL,
CUR_WINDOW_ID_INS,
RETRIEVE_DATA,
REFRESH_ROW_COUNT
)
values
(
/* CDC_SET_NAME */ 'FUSION.FUSIONTABLEUPDATEDEMO',
/* CUR_WINDOW_ID */ -1,
/* CUR_WINDOW_ID_DEL */ -1,
/* CUR_WINDOW_ID_INS */ -1,
/* RETRIEVE_DATA */ NULL,
/* REFRESH_ROW_COUNT */ 'update FUSION.SNP_CDC_SUBS set CDC_ROW_COUNT = 0' ||
'where CDC_SUBSCRIBER =''$$SUBSCRIBER_NAME$$''' ||'and CDC_SET_NAME = ''FUSION.FUSIONTABLEUPDATEDEMO'' '
)
11. 更新 SNP_CDC_SET 表
update FUSION.SNP_CDC_SET CDC
set CDC.RETRIEVE_DATA = NULL,
CDC.REFRESH_ROW_COUNT = 'update FUSION.SNP_CDC_SUBS set CDC_ROW_COUNT = 0' ||
'where CDC_SUBSCRIBER =''$$SUBSCRIBER_NAME$$''' ||'and CDC_SET_NAME = ''FUSION.FUSIONTABLEUPDATEDEMO'' 'where CDC.CDC_SET_NAME ='FUSION.FUSIONTABLEUPDATEDEMO'
12. 向 SNP_CDC_SUBS 中插入数据
insCmd = """
insert into FUSION.SNP_CDC_SUBS
(
CDC_SET_NAME,
CDC_SUBSCRIBER,
CDC_REFDATE,
MIN_WINDOW_ID,
MAX_WINDOW_ID,
MAX_WINDOW_ID_DEL,
MAX_WINDOW_ID_INS,
CDC_ROW_COUNT
)
select /* CDC_SET_NAME */ 'FUSION.FUSIONTABLEUPDATEDEMO',
/* CDC_SUBSCRIBER */ ?,
/* CDC_REFDATE */ sysdate,
/* MIN_WINDOW_ID */ CDC.CUR_WINDOW_ID,
/* MAX_WINDOW_ID */ -1,
/* MAX_WINDOW_ID_DEL */ -1,
/* MAX_WINDOW_ID_INS */ -1,
/* CDC_ROW_COUNT */ 0
from FUSION.SNP_CDC_SET CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
"""selCmd ="""
select count('X')
from FUSION.SNP_CDC_SUBS CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and CDC.CDC_SUBSCRIBER = ?
"""
# Prepare the statements
prepInsStmt = myCon.prepareStatement(insCmd)
prepSelStmt = myCon.prepareStatement(selCmd)
# Execute for each subscriber
for sbs in lstSbs:
prepSelStmt.setString(1, sbs)
rs = prepSelStmt.executeQuery()
rs.next()
nbSubs = rs.getInt(1)
if (nbSubs == 0):
prepInsStmt.setString(1, sbs)
nbRows = prepInsStmt.executeUpdate()
prepInsStmt.close()
prepSelStmt.close()
# Commit the updates
myCon.commit()
13. 更新 SNP_CDC_SUBS
updCmd = """
update FUSION.SNP_CDC_SUBS SUBS
set (
SUBS.MAX_WINDOW_ID,
SUBS.MAX_WINDOW_ID_INS,
SUBS.MAX_WINDOW_ID_DEL
) = (
select CDC.CUR_WINDOW_ID,
CDC.CUR_WINDOW_ID_INS,
CDC.CUR_WINDOW_ID_DEL
from FUSION.SNP_CDC_SET CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
)
where SUBS.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and SUBS.CDC_SUBSCRIBER = ?
"""
# Prepare the update
prepUpdStmt = myCon.prepareStatement(updCmd)
# Execute it for each subscriber
for sbs in lstSbs:
prepUpdStmt.setString(1, sbs)
nbRows = prepUpdStmt.executeUpdate()
prepUpdStmt.close()
# Commit the updates
myCon.commit()
14. 更新 SNP_CDC_SUBS
updCmd = """
update FUSION.SNP_CDC_SUBS SUBS
set SUBS.MIN_WINDOW_ID = SUBS.MAX_WINDOW_ID_INS
where SUBS.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and SUBS.CDC_SUBSCRIBER = ?
"""
# Prepare the update
prepUpdStmt = myCon.prepareStatement(updCmd)
# Execute it for each subscriber
for sbs in lstSbs:
prepUpdStmt.setString(1, sbs)
nbRows = prepUpdStmt.executeUpdate()
prepUpdStmt.close()
# Commit the updates
myCon.commit()
15.
/* After each change on the content of this table we recompute the statistics */
/* Statistics are always up to date on this table */
begin
dbms_stats.gather_table_stats(
ownname => 'FUSION',
tabname => 'SNP_CDC_SUBS',
estimate_percent => dbms_stats.auto_sample_size
);
end;
16. 关闭连接
myCon.close()
开启日记
1. 检查数据库版本
DECLARE -- Validating KM options
dbVersion1 NUMBER(2,0);
dbVersion2 NUMBER(2,0);
BEGIN
-- Verify RDBMS version
select
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, 1, instr(value, '.',1,1)-1)) else null end v0,
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, instr(value, '.',1,1)+1, instr(value, '.',1,2)-instr(value, '.',1,1)-1)) else null end v1
into dbVersion1, dbVersion2
from v$parameter
where upper(name) = 'COMPATIBLE';
IF dbVersion1 < 9 THEN
raise_application_error(-20101, 'ODIKM-ORA-10000: You are using RDBMS version'||dbVersion1||'.'||dbVersion2||'. This version does not support JKM COMPATIBLE= 9.');
END IF;
END;
2. 开启连接
import java.sql as sql
import java.lang as lang
# Connect to the Source Database ie the one that hosts the journalized tables
myCon = odiRef.getJDBCConnection("SRC")
# Create the list of Subscribers for a CDC Set
lstSbs = []
3. 创建表 SNP_CDC_SUBS
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_SUBS
(CDC_SET_NAME VARCHAR2(255 CHAR) not null, /* Name of the CDC Set (ODI model code following the Local Mask syntax) */
CDC_SUBSCRIBER VARCHAR2(400 CHAR) not null, /* Name of the subscriber who subscribed to the CDC Set */
CDC_REFDATE DATE not null, /* Last update of the record */
MIN_WINDOW_ID NUMBER(20) not null, /* Window Ids under this one should be ignored */
MAX_WINDOW_ID NUMBER(20) not null, /* Maximum Window Id used by this subscription */
MAX_WINDOW_ID_DEL NUMBER(20) not null, /* Maximum Window Id to take into consideration when looking at consistency for deletes */
MAX_WINDOW_ID_INS NUMBER(20) not null, /* Maximum Window Id to take into consideration when looking at consistency for inserts / updates */
CDC_ROW_COUNT NUMBER(10) not null, /* Number of rows in the journalizing tables for this subscription */
constraint PK_SNP_CDC_SBS primary key (CDC_SET_NAME, CDC_SUBSCRIBER)
)
4. 设置 SNP_CDC_SUBS 字段 CDC_SUBSCRIBER 长度
alter table FUSION.SNP_CDC_SUBS modify (CDC_SUBSCRIBER VARCHAR2(400 CHAR))
5. 设置 SNP_CDC_SUBS_AE 字段 CDC_SUBSCRIBER 长度
alter table FUSION.SNP_CDC_SUBS_AE modify (CDC_SUBSCRIBER VARCHAR2(400 CHAR))
6. 创建表 SNP_CDC_SET
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_SET
(CDC_SET_NAME VARCHAR2(255 CHAR) not null, /* Name of the CDC Set (ODI model code following the Local Mask syntax) */
CUR_WINDOW_ID NUMBER(20) not null, /* Last window Id that has been used */
CUR_WINDOW_ID_DEL NUMBER(20) not null, /* Last Window Id used to compute delete consitency */
CUR_WINDOW_ID_INS NUMBER(20) not null, /* Last Window Id used to compute inert / update consitency */
RETRIEVE_DATA VARCHAR2(2000 CHAR) null, /* Command to execute in order to retrieve the journal data (used by the OdiRetrieveJournalData tool) */
REFRESH_ROW_COUNT VARCHAR2(2000 CHAR) null, /* Command to execute in order to refresh the row count(used by the OdiRefreshJournalData tool) */
constraint PK_SNP_CDC_SET primary key (CDC_SET_NAME)
)
7. 创建表 SNP_CDC_SET_TABLE
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_SET_TABLE
(CDC_SET_NAME VARCHAR2(255 CHAR) not null, /* Name of the CDC Set (ODI model code following the Local Mask syntax) */
FULL_TABLE_NAME VARCHAR2(255 CHAR) not null, /* Complete name of the journalized table */
FULL_DATA_VIEW VARCHAR2(255 CHAR) not null, /* Complete name of the journalizing data view */
RETRIEVE_DATA VARCHAR2(2000 CHAR) null, /* Command to execute to retrieve the journal data (used by the OdiRetrieveJournalData tool) */
REFRESH_ROW_COUNT VARCHAR2(2000 CHAR) null, /* Command to execute to refresh the row count (used by the OdiRefreshRowCount tool) */
CDC_LAST_DATE TIMESTAMP,
CDC_CURRENT_DATE TIMESTAMP,
constraint PK_SNP_CDC_TBL primary key (FULL_TABLE_NAME)
)
8.SNP_CDC_SET_TABLE 添加字段
alter table FUSION.SNP_CDC_SET_TABLE add
(
CDC_LAST_DATE TIMESTAMP,
CDC_CURRENT_DATE TIMESTAMP
)
9. 建表 SNP_CDC_OBJECTS
/* This table is created in the work physical schema set as default for the Data server */
create table FUSION.SNP_CDC_OBJECTS
(FULL_TABLE_NAME VARCHAR2(255 CHAR) not null, /* Complete name of the journalized table */
CDC_OBJECT_TYPE VARCHAR2(35 CHAR) not null, /* Type of the object in the CDC framework */
FULL_OBJECT_NAME VARCHAR2(255 CHAR) not null, /* Complete name of the object */
DB_OBJECT_TYPE VARCHAR2(35 CHAR) null, /* RDBMS type of the object (ex : TABLE, VIEW, TRIGGER) */
constraint PK_SNP_CDC_OBJ primary key (FULL_TABLE_NAME, CDC_OBJECT_TYPE)
)
10. 向 SNP_CDC_SET 中插入数据
insert into FUSION.SNP_CDC_SET
(
CDC_SET_NAME,
CUR_WINDOW_ID,
CUR_WINDOW_ID_DEL,
CUR_WINDOW_ID_INS,
RETRIEVE_DATA,
REFRESH_ROW_COUNT
)
values
(
/* CDC_SET_NAME */ 'FUSION.FUSIONTABLEUPDATEDEMO',
/* CUR_WINDOW_ID */ -1,
/* CUR_WINDOW_ID_DEL */ -1,
/* CUR_WINDOW_ID_INS */ -1,
/* RETRIEVE_DATA */ NULL,
/* REFRESH_ROW_COUNT */ 'update FUSION.SNP_CDC_SUBS set CDC_ROW_COUNT = 0' ||
'where CDC_SUBSCRIBER =''$$SUBSCRIBER_NAME$$''' ||'and CDC_SET_NAME = ''FUSION.FUSIONTABLEUPDATEDEMO'' '
)
11. 更新 SNP_CDC_SET
update FUSION.SNP_CDC_SET CDC
set CDC.RETRIEVE_DATA = NULL,
CDC.REFRESH_ROW_COUNT = 'update FUSION.SNP_CDC_SUBS set CDC_ROW_COUNT = 0' ||
'where CDC_SUBSCRIBER =''$$SUBSCRIBER_NAME$$''' ||'and CDC_SET_NAME = ''FUSION.FUSIONTABLEUPDATEDEMO'' 'where CDC.CDC_SET_NAME ='FUSION.FUSIONTABLEUPDATEDEMO'
12. 删除 SNP_TMP_TABLE_LIST
drop table FUSION.SNP_TMP_TABLE_LIST
13. 创建表 SNP_TMP_TABLE_LIST
create table FUSION.SNP_TMP_TABLE_LIST
(FULL_TABLE_NAME VARCHAR2(255 CHAR) not null
)
14. 向表 SNP_TMP_TABLE_LIST 中插入数据,数据内容是添加日记的表
insert into FUSION.SNP_TMP_TABLE_LIST
(FULL_TABLE_NAME)
values
(/* FULL_TABLE_NAME */ 'FUSION.DMS_TEST_DELET_SOURCE')
15. 创建日志表
/* This command is sent to the database only if the table is not already a part of the set */
/* If you want this command to be always executed, remove the query on the source tab */
create table FUSION.J$DMS_TEST_DELET_SOURCE /* This table is created in the work physical schema of the current Data server */
(WINDOW_ID NUMBER(20) null,
ID NUMBER null
)
16. 向 SNP_CDC_OBJECTS 插入数据,内容是添加日记的表和刚才生成的表
insert into FUSION.SNP_CDC_OBJECTS
(
FULL_TABLE_NAME,
CDC_OBJECT_TYPE,
FULL_OBJECT_NAME,
DB_OBJECT_TYPE
)
values
(
/* FULL_TABLE_NAME */ 'FUSION.DMS_TEST_DELET_SOURCE',
/* CDC_OBJECT_TYPE */ 'JRN_FULL_NAME',
/* FULL_OBJECT_NAME */ 'FUSION.J$DMS_TEST_DELET_SOURCE',
/* DB_OBJECT_TYPE */ 'TABLE'
)
17. 创建视图 JV$DMS_TEST_DELET_SOURCE
create or replace view FUSION.JV$DMS_TEST_DELET_SOURCE /* This view is created in the work physical schema of the current Data server */
as
select
decode(TARG.ROWID, null, 'D', 'I') AS JRN_FLAG,
sysdate AS JRN_DATE, /* For V3 compatibility */
JRN.CDC_SUBSCRIBER AS JRN_SUBSCRIBER, /* Renamed for V3 compatibility */
JRN.ID AS ID
,TARG.HOST AS HOST,
TARG.PORT AS PORT,
TARG.NAME AS NAME,
TARG.APPLICATION AS APPLICATION,
TARG.CONTEXT_ROOT AS CONTEXT_ROOT,
TARG.SERVICE AS SERVICE,
TARG.PATH AS PATH,
TARG.DEPLOYMENT_STATE AS DEPLOYMENT_STATE,
TARG.SOURCE_INFORMATION AS SOURCE_INFORMATION,
TARG.ACTIVE_SERVER_COUNT AS ACTIVE_SERVER_COUNT,
TARG.SERVLETS AS SERVLETS,
TARG.PENDING_REQUESTS AS PENDING_REQUESTS,
TARG.COMPLETED_REQUESTS AS COMPLETED_REQUESTS,
TARG.OPEN_SESSIONS_CURRENT_COUNT AS OPEN_SESSIONS_CURRENT_COUNT,
TARG.OPEN_SESSIONS_HIGH_COUNT AS OPEN_SESSIONS_HIGH_COUNT,
TARG.SESSIONS_OPENED_TOTAL_COUNT AS SESSIONS_OPENED_TOTAL_COUNT,
TARG.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER,
TARG.CREATION_DATE AS CREATION_DATE,
TARG.CREATED_BY AS CREATED_BY,
TARG.LAST_UPDATE_DATE AS LAST_UPDATE_DATE,
TARG.LAST_UPDATED_BY AS LAST_UPDATED_BY,
TARG.DATA_STATUS AS DATA_STATUS
from (select JRN.ID,SUB.CDC_SUBSCRIBER, SUB.MAX_WINDOW_ID_INS, max(JRN.WINDOW_ID) WINDOW_ID
from FUSION.J$DMS_TEST_DELET_SOURCE JRN,
FUSION.SNP_CDC_SUBS SUB
where SUB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and JRN.WINDOW_ID > SUB.MIN_WINDOW_ID
and JRN.WINDOW_ID <= SUB.MAX_WINDOW_ID_DEL
group by JRN.ID,SUB.CDC_SUBSCRIBER, SUB.MAX_WINDOW_ID_INS) JRN,
FUSION.DMS_TEST_DELET_SOURCE TARG
where
JRN.ID = TARG.ID (+)
and not (
TARG.ROWID is not null
and JRN.WINDOW_ID > JRN.MAX_WINDOW_ID_INS
)
select JRN.ID,SUB.CDC_SUBSCRIBER, SUB.MAX_WINDOW_ID_INS, max(JRN.WINDOW_ID) WINDOW_ID
from FUSION.J$DMS_TEST_DELET_SOURCE JRN,
FUSION.SNP_CDC_SUBS SUB
where SUB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and JRN.WINDOW_ID > SUB.MIN_WINDOW_ID
and JRN.WINDOW_ID <= SUB.MAX_WINDOW_ID_DEL
group by JRN.ID,SUB.CDC_SUBSCRIBER, SUB.MAX_WINDOW_ID_INS
18. 向 SNP_CDC_OBJECTS 表中插入数据,数据为添加日记的表和新建的视图
insert into FUSION.SNP_CDC_OBJECTS
(
FULL_TABLE_NAME,
CDC_OBJECT_TYPE,
FULL_OBJECT_NAME,
DB_OBJECT_TYPE
)
values
(
/* FULL_TABLE_NAME */ 'FUSION.DMS_TEST_DELET_SOURCE',
/* CDC_OBJECT_TYPE */ 'JRN_FULL_VIEW',
/* FULL_OBJECT_NAME */ 'FUSION.JV$DMS_TEST_DELET_SOURCE',
/* DB_OBJECT_TYPE */ 'VIEW'
)
19. 创建视图 DDMS_TEST_DELET_SOURCE
create or replace view FUSION.JV$DDMS_TEST_DELET_SOURCE /* This view is created in the work physical schema of the current Data server */
as
select
decode(TARG.ROWID, null, 'D', 'I') AS JRN_FLAG,
sysdate AS JRN_DATE, /* For V3 compatibility */
SUB.CDC_SUBSCRIBER AS JRN_SUBSCRIBER, /* Renamed for V3 compatibility */
PKS.ID AS ID
,TARG.HOST AS HOST,
TARG.PORT AS PORT,
TARG.NAME AS NAME,
TARG.APPLICATION AS APPLICATION,
TARG.CONTEXT_ROOT AS CONTEXT_ROOT,
TARG.SERVICE AS SERVICE,
TARG.PATH AS PATH,
TARG.DEPLOYMENT_STATE AS DEPLOYMENT_STATE,
TARG.SOURCE_INFORMATION AS SOURCE_INFORMATION,
TARG.ACTIVE_SERVER_COUNT AS ACTIVE_SERVER_COUNT,
TARG.SERVLETS AS SERVLETS,
TARG.PENDING_REQUESTS AS PENDING_REQUESTS,
TARG.COMPLETED_REQUESTS AS COMPLETED_REQUESTS,
TARG.OPEN_SESSIONS_CURRENT_COUNT AS OPEN_SESSIONS_CURRENT_COUNT,
TARG.OPEN_SESSIONS_HIGH_COUNT AS OPEN_SESSIONS_HIGH_COUNT,
TARG.SESSIONS_OPENED_TOTAL_COUNT AS SESSIONS_OPENED_TOTAL_COUNT,
TARG.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER,
TARG.CREATION_DATE AS CREATION_DATE,
TARG.CREATED_BY AS CREATED_BY,
TARG.LAST_UPDATE_DATE AS LAST_UPDATE_DATE,
TARG.LAST_UPDATED_BY AS LAST_UPDATED_BY,
TARG.DATA_STATUS AS DATA_STATUS
from (
select distinct
PKS_UNION.ID,PKS_UNION.WINDOW_ID
from (
select
JRN.ID,JRN.WINDOW_ID
from
FUSION.J$DMS_TEST_DELET_SOURCE JRN
union
select
TARG.ID,null
from
FUSION.DMS_TEST_DELET_SOURCE TARG,
FUSION.SNP_CDC_SET_TABLE CDCTAB
where
TARG.LAST_UPDATE_DATE >= CDCTAB.CDC_LAST_DATE
and CDCTAB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and CDCTAB.FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
) PKS_UNION
) PKS,
FUSION.SNP_CDC_SUBS SUB,
FUSION.DMS_TEST_DELET_SOURCE TARG
where PKS.ID = TARG.ID (+)
and SUB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and (PKS.WINDOW_ID > SUB.MIN_WINDOW_ID or PKS.WINDOW_ID is null)
20. 向 SNP_CDC_OBJECTS 插入数据,数据为添加日记的表和 JV$DDMS_TEST_DELET_SOURCE
insert into FUSION.SNP_CDC_OBJECTS
(
FULL_TABLE_NAME,
CDC_OBJECT_TYPE,
FULL_OBJECT_NAME,
DB_OBJECT_TYPE
)
values
(
/* FULL_TABLE_NAME */ 'FUSION.DMS_TEST_DELET_SOURCE',
/* CDC_OBJECT_TYPE */ 'JRN_FULL_DATA_VIEW',
/* FULL_OBJECT_NAME */ 'FUSION.JV$DDMS_TEST_DELET_SOURCE',
/* DB_OBJECT_TYPE */ 'VIEW'
)
21. 创建删除 Trigger
triggerCmd = """
create or replace trigger FUSION.T$DMS_TEST_DELET_SOURCE
after delete on FUSION.DMS_TEST_DELET_SOURCE
for each row
declare
V_ID NUMBER;
begin
V_ID := :old.ID;
insert into FUSION.J$DMS_TEST_DELET_SOURCE
(
WINDOW_ID,
ID
)
values
(
/* WINDOW_ID */ NULL,
:old.ID
);
end;
"""
# Create the statement
myStmt = myCon.createStatement()
# Execute the trigger creation
myStmt.execute(triggerCmd)
myStmt.close()
myStmt = None
# Commit, just in case
myCon.commit()
22. 向表 SNP_CDC_OBJECTS 中插入数据,内容为添加日记的表和 T$DMS_TEST_DELET_SOURCE
insert into FUSION.SNP_CDC_OBJECTS
(
FULL_TABLE_NAME,
CDC_OBJECT_TYPE,
FULL_OBJECT_NAME,
DB_OBJECT_TYPE
)
values
(
/* FULL_TABLE_NAME */ 'FUSION.DMS_TEST_DELET_SOURCE',
/* CDC_OBJECT_TYPE */ 'JRN_FULL_TRIGGER',
/* FULL_OBJECT_NAME */ 'FUSION.T$DMS_TEST_DELET_SOURCE',
/* DB_OBJECT_TYPE */ 'TRIGGER'
)
23. 删除 SNP_CDC_SET_TABLE 中的 FUSION.FUSIONTABLEUPDATEDEMO
delete from FUSION.SNP_CDC_SET_TABLE CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and CDC.FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
24. 向 SNP_CDC_SET_TABLE 中插入数据
insert into FUSION.SNP_CDC_SET_TABLE
(
CDC_SET_NAME,
FULL_TABLE_NAME,
FULL_DATA_VIEW,
RETRIEVE_DATA,
REFRESH_ROW_COUNT,
CDC_LAST_DATE,
CDC_CURRENT_DATE
)
values
(
/* CDC_SET_NAME */ 'FUSION.FUSIONTABLEUPDATEDEMO',
/* FULL_TABLE_NAME */ 'FUSION.DMS_TEST_DELET_SOURCE',
/* FULL_DATA_VIEW */ 'FUSION.JV$DDMS_TEST_DELET_SOURCE',
/* RETRIEVE_DATA */ NULL,
/* REFRESH_ROW_COUNT */ 'update FUSION.SNP_CDC_SUBS SUBS set SUBS.CDC_ROW_COUNT = (select SUBS.CDC_ROW_COUNT + count(*) from FUSION.JV$DDMS_TEST_DELET_SOURCE where JRN_SUBSCRIBER =''$$SUBSCRIBER_NAME$$'') where CDC_SET_NAME =''FUSION.FUSIONTABLEUPDATEDEMO''and CDC_SUBSCRIBER=''$$SUBSCRIBER_NAME$$''',
sysdate,
sysdate
)
25. 删除临时表 SNP_TMP_TABLE_LIST
drop table FUSION.SNP_TMP_TABLE_LIST
26. 锁定 CDC 记录
/* Mandatory to prevent OdiRetrieveData or other Extend window in parallel */
update FUSION.SNP_CDC_SET CDC
set CDC.CUR_WINDOW_ID = CDC.CUR_WINDOW_ID + 2,
CDC.CUR_WINDOW_ID_INS = CDC.CUR_WINDOW_ID + 1,
CDC.CUR_WINDOW_ID_DEL = CDC.CUR_WINDOW_ID + 2
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
27. 设置 CDC CurrentDate
update FUSION.SNP_CDC_SET_TABLE
set CDC_CURRENT_DATE = (select nvl(min(SCN_TO_TIMESTAMP(TR.start_scnb)),SYSTIMESTAMP-6/86400) cur_time
from gv$transaction TR,
gv$session SE,
gv$lock LK,
all_objects OB
where TR.addr = SE.taddr
and SE.sid = LK.sid
and LK.id1 = OB.object_id
and OB.owner || '.' || OB.object_name = 'FUSION.DMS_TEST_DELET_SOURCE'
)
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
28. 向 J$DMS_TEST_DELET_SOURCE 表插入数据
/* Compute Window Id for each table of the CDC Set from children to parents */
/* This computes consistency regarding Inserts and Updates */
/* Since there are no triggers, the J$ table is populated by the extend window step... */
insert into FUSION.J$DMS_TEST_DELET_SOURCE
(
WINDOW_ID,
ID
)
select
/* WINDOW_ID */ CDCSET.CUR_WINDOW_ID + 1,
TARG.ID
from
FUSION.DMS_TEST_DELET_SOURCE TARG,
(
-- Get current window id
select CUR_WINDOW_ID
from FUSION.SNP_CDC_SET
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
) CDCSET,
FUSION.SNP_CDC_SET_TABLE CDCTAB
where
TARG.LAST_UPDATE_DATE >= CDCTAB.CDC_LAST_DATE
and CDCTAB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and CDCTAB.FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
29. 更新 FUSION.SNP_CDC_SET_TABLE
update FUSION.SNP_CDC_SET_TABLE
set CDC_LAST_DATE = CDC_CURRENT_DATE
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
30. 解锁 CDC
/* commit */
31.
/* After each change on the content of this table we recompute the statistics */
/* Statistics are always up to date on this table */
begin
dbms_stats.gather_table_stats(
ownname => 'FUSION',
tabname => 'SNP_CDC_SET_TABLE',
estimate_percent => dbms_stats.auto_sample_size
);
end;
32. 关闭连接
myCon.close()
扩展窗口
1. 校验版本
DECLARE -- Validating KM options
dbVersion1 NUMBER(2,0);
dbVersion2 NUMBER(2,0);
BEGIN
-- Verify RDBMS version
select
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, 1, instr(value, '.',1,1)-1)) else null end v0,
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, instr(value, '.',1,1)+1, instr(value, '.',1,2)-instr(value, '.',1,1)-1)) else null end v1
into dbVersion1, dbVersion2
from v$parameter
where upper(name) = 'COMPATIBLE';
IF dbVersion1 < 9 THEN
raise_application_error(-20101, 'ODIKM-ORA-10000: You are using RDBMS version'||dbVersion1||'.'||dbVersion2||'. This version does not support JKM COMPATIBLE= 9.');
END IF;
END;
2. 更新 CDC 数据
/* Mandatory to prevent OdiRetrieveData or other Extend window in parallel */
update FUSION.SNP_CDC_SET CDC
set CDC.CUR_WINDOW_ID = CDC.CUR_WINDOW_ID + 2,
CDC.CUR_WINDOW_ID_INS = CDC.CUR_WINDOW_ID + 1,
CDC.CUR_WINDOW_ID_DEL = CDC.CUR_WINDOW_ID + 2
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
3. 更新 SNP_CDC_SET_TABLE 数据
update FUSION.SNP_CDC_SET_TABLE 数据
set CDC_CURRENT_DATE = (select nvl(min(SCN_TO_TIMESTAMP(TR.start_scnb)),SYSTIMESTAMP-6/86400) cur_time
from gv$transaction TR,
gv$session SE,
gv$lock LK,
all_objects OB
where TR.addr = SE.taddr
and SE.sid = LK.sid
and LK.id1 = OB.object_id
and OB.owner || '.' || OB.object_name = 'FUSION.DMS_TEST_DELET_SOURCE'
)
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
4. 向 FUSION.J$DMS_TEST_DELET_SOURCE 表中插入数据
/* Compute Window Id for each table of the CDC Set from children to parents */
/* This computes consistency regarding Inserts and Updates */
/* Since there are no triggers, the J$ table is populated by the extend window step... */
insert into FUSION.J$DMS_TEST_DELET_SOURCE
(
WINDOW_ID,
ID
)
select
/* WINDOW_ID */ CDCSET.CUR_WINDOW_ID + 1,
TARG.ID
from
FUSION.DMS_TEST_DELET_SOURCE TARG,
(
-- Get current window id
select CUR_WINDOW_ID
from FUSION.SNP_CDC_SET
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
) CDCSET,
FUSION.SNP_CDC_SET_TABLE CDCTAB
where
TARG.LAST_UPDATE_DATE >= CDCTAB.CDC_LAST_DATE
and CDCTAB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and CDCTAB.FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
5. 更新 J$DMS_TEST_DELET_SOURCE
/* Compute Window Id for each table of the CDC Set from parents to children */
/* This computes consistency regarding Deletes */
update FUSION.J$DMS_TEST_DELET_SOURCE JRN
set JRN.WINDOW_ID = (
select CDC.CUR_WINDOW_ID_DEL + 2
from FUSION.SNP_CDC_SET CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
)
where JRN.WINDOW_ID is null
6. 更新 SNP_CDC_SET_TABLE 表中 CDC 上次更新时间
update FUSION.SNP_CDC_SET_TABLE
set CDC_LAST_DATE = CDC_CURRENT_DATE
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
7. 解锁 CDC
/* commit */
8.
begin
dbms_stats.gather_table_stats(
ownname => 'FUSION',
tabname => 'J$DMS_TEST_DELET_SOURCE',
estimate_percent => dbms_stats.auto_sample_size
);
end;
9.
myCon.close()
执行以下代码
update FUSION.SNP_CDC_SET CDC
set CDC.CUR_WINDOW_ID = CDC.CUR_WINDOW_ID + 2,
CDC.CUR_WINDOW_ID_INS = CDC.CUR_WINDOW_ID + 1,
CDC.CUR_WINDOW_ID_DEL = CDC.CUR_WINDOW_ID + 2
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
将 FUSION.SNP_CDC_SET 中字段更新
更新 FUSION.SNP_CDC_SET_TABLE 设置 CDC_CURRENT_DATE 为当前时间
向 FUSION.J$DMS_TEST_DELET_SOURCE 表中插入数据,值为从 FUSION.SNP_CDC_SET 取出的 FUSION.FUSIONTABLEUPDATEDEMO 数据 CDCSET.CUR_WINDOW_ID + 1, 条件是 TARG.LAST_UPDATE_DATE(目标表)的上次更新时间晚于 FUSION.SNP_CDC_SET_TABLE 的上次操作时间,TARG.LAST_UPDATE_DATE(目标表)的 CDC_LAST_DATE 会在开启日记时更新为当前时间,然后在每次扩展窗口时也更新,更新操作在向 FUSION.J$DMS_TEST_DELET_SOURCE 表中插入数据之后
意思就是如果上次更新时间早于 CDC 上次操作时间,则此条记录不会被捕获
insert into FUSION.J$DMS_TEST_DELET_SOURCE
(
WINDOW_ID,
ID
)
select
/* WINDOW_ID */ CDCSET.CUR_WINDOW_ID + 1,
TARG.ID
from
FUSION.DMS_TEST_DELET_SOURCE TARG,
(
-- Get current window id
select CUR_WINDOW_ID
from FUSION.SNP_CDC_SET
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
) CDCSET,
FUSION.SNP_CDC_SET_TABLE CDCTAB
where
TARG.LAST_UPDATE_DATE >= CDCTAB.CDC_LAST_DATE
and CDCTAB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and CDCTAB.FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
更新 FUSION.J$DMS_TEST_DELET_SOURCE,设置没有 JRN.WINDOW_ID 的 WINDOW_ID
/* Compute Window Id for each table of the CDC Set from parents to children */
/* This computes consistency regarding Deletes */
update FUSION.J$DMS_TEST_DELET_SOURCE JRN
set JRN.WINDOW_ID = (
select CDC.CUR_WINDOW_ID_DEL + 2
from FUSION.SNP_CDC_SET CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
)
where JRN.WINDOW_ID is null
设置 CDC_LAST_DATE 为 CDC_CURRENT_DATE,上面提到的更新 CDC_LAST_DATE
update FUSION.SNP_CDC_SET_TABLE
set CDC_LAST_DATE 为 = CDC_CURRENT_DATE
where CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and FULL_TABLE_NAME = 'FUSION.DMS_TEST_DELET_SOURCE'
锁定订户
1. 校验版本
DECLARE -- Validating KM options
dbVersion1 NUMBER(2,0);
dbVersion2 NUMBER(2,0);
BEGIN
-- Verify RDBMS version
select
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, 1, instr(value, '.',1,1)-1)) else null end v0,
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, instr(value, '.',1,1)+1, instr(value, '.',1,2)-instr(value, '.',1,1)-1)) else null end v1
into dbVersion1, dbVersion2
from v$parameter
where upper(name) = 'COMPATIBLE';
IF dbVersion1 < 9 THEN
raise_application_error(-20101, 'ODIKM-ORA-10000: You are using RDBMS version'||dbVersion1||'.'||dbVersion2||'. This version does not support JKM COMPATIBLE= 9.');
END IF;
END;
2. 获取连接
import java.sql as sql
import java.lang as lang
# Connect to the Source Database ie the one that hosts the journalized tables
myCon = odiRef.getJDBCConnection("SRC")
# Create the list of Subscribers for a CDC Set
lstSbs = []
lstSbs.append('Zero')
3.
updCmd = """
update FUSION.SNP_CDC_SUBS SUBS
set (
SUBS.MAX_WINDOW_ID,
SUBS.MAX_WINDOW_ID_INS,
SUBS.MAX_WINDOW_ID_DEL
) = (
select CDC.CUR_WINDOW_ID,
CDC.CUR_WINDOW_ID_INS,
CDC.CUR_WINDOW_ID_DEL
from FUSION.SNP_CDC_SET CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
)
where SUBS.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and SUBS.CDC_SUBSCRIBER = ?
"""
# Prepare the update
prepUpdStmt = myCon.prepareStatement(updCmd)
# Execute it for each subscriber
for sbs in lstSbs:
prepUpdStmt.setString(1, sbs)
nbRows = prepUpdStmt.executeUpdate()
prepUpdStmt.close()
# Commit the updates
myCon.commit()
4. 关闭连接
myCon.close()
updCmd = “””
update FUSION.SNP_CDC_SUBS SUBS
set (
SUBS.MAX_WINDOW_ID,
SUBS.MAX_WINDOW_ID_INS,
SUBS.MAX_WINDOW_ID_DEL
) = (
select CDC.CUR_WINDOW_ID,
CDC.CUR_WINDOW_ID_INS,
CDC.CUR_WINDOW_ID_DEL
from FUSION.SNP_CDC_SET CDC
where CDC.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
)
where SUBS.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and SUBS.CDC_SUBSCRIBER = ?
"""
//Prepare the update
prepUpdStmt = myCon.prepareStatement(updCmd)
//Execute it for each subscriber
for sbs in lstSbs:// 遍历监听者列表,对所有监听者做更新操作
prepUpdStmt.setString(1, sbs)// 设置占位符
nbRows = prepUpdStmt.executeUpdate()// 执行更新操作
prepUpdStmt.close()
// Commit the updates
myCon.commit()
更新 FUSION.SNP_CDC_SUBS 参数,设置 FUSION.SNP_CDC_SET CUR 参数为 FUSION.SNP_CDC_SUBS MAX 参数
select
decode(TARG.ROWID, null, 'D', 'I') AS JRN_FLAG,
sysdate AS JRN_DATE,
JRN.CDC_SUBSCRIBER AS JRN_SUBSCRIBER,
JRN.ID AS ID
,TARG.HOST AS HOST,
TARG.PORT AS PORT,
TARG.NAME AS NAME,
TARG.APPLICATION AS APPLICATION,
TARG.CONTEXT_ROOT AS CONTEXT_ROOT,
TARG.SERVICE AS SERVICE,
TARG.PATH AS PATH,
TARG.DEPLOYMENT_STATE AS DEPLOYMENT_STATE,
TARG.SOURCE_INFORMATION AS SOURCE_INFORMATION,
TARG.ACTIVE_SERVER_COUNT AS ACTIVE_SERVER_COUNT,
TARG.SERVLETS AS SERVLETS,
TARG.PENDING_REQUESTS AS PENDING_REQUESTS,
TARG.COMPLETED_REQUESTS AS COMPLETED_REQUESTS,
TARG.OPEN_SESSIONS_CURRENT_COUNT AS OPEN_SESSIONS_CURRENT_COUNT,
TARG.OPEN_SESSIONS_HIGH_COUNT AS OPEN_SESSIONS_HIGH_COUNT,
TARG.SESSIONS_OPENED_TOTAL_COUNT AS SESSIONS_OPENED_TOTAL_COUNT,
TARG.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER,
TARG.CREATION_DATE AS CREATION_DATE,
TARG.CREATED_BY AS CREATED_BY,
TARG.LAST_UPDATE_DATE AS LAST_UPDATE_DATE,
TARG.LAST_UPDATED_BY AS LAST_UPDATED_BY,
TARG.DATA_STATUS AS DATA_STATUS
from (select JRN.ID,SUB.CDC_SUBSCRIBER, SUB.MAX_WINDOW_ID_INS, max(JRN.WINDOW_ID) WINDOW_ID
from FUSION.J$DMS_TEST_DELET_SOURCE JRN,
FUSION.SNP_CDC_SUBS SUB
where SUB.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and JRN.WINDOW_ID > SUB.MIN_WINDOW_ID
and JRN.WINDOW_ID <= SUB.MAX_WINDOW_ID_DEL
group by JRN.ID,SUB.CDC_SUBSCRIBER, SUB.MAX_WINDOW_ID_INS) JRN,
FUSION.DMS_TEST_DELET_SOURCE TARG
where
JRN.ID = TARG.ID (+)--TARG 内被删除的查询不出来
and not (
TARG.ROWID is not null-- 不是删除操作
and JRN.WINDOW_ID > JRN.MAX_WINDOW_ID_INS
);
TARG.ROWID 为 null 则说明单据被删除
执行接口
1. 删除表 FUSION.C$_0SOURCE
drop table FUSION.C$_0SOURCE purge
2. 创建表 FUSION.C$_0SOURCE work table
create table FUSION.C$_0SOURCE
(
ID NUMBER NULL,
HOST VARCHAR2(200) NULL,
PORT VARCHAR2(200) NULL,
NAME VARCHAR2(200) NULL,
APPLICATION VARCHAR2(200) NULL,
CONTEXT_ROOT VARCHAR2(200) NULL,
SERVICE VARCHAR2(200) NULL,
PATH VARCHAR2(4000) NULL,
DEPLOYMENT_STATE VARCHAR2(200) NULL,
SOURCE_INFORMATION VARCHAR2(200) NULL,
ACTIVE_SERVER_COUNT NUMBER NULL,
SERVLETS NUMBER NULL,
PENDING_REQUESTS NUMBER NULL,
COMPLETED_REQUESTS NUMBER NULL,
OPEN_SESSIONS_CURRENT_COUNT NUMBER NULL,
OPEN_SESSIONS_HIGH_COUNT NUMBER NULL,
SESSIONS_OPENED_TOTAL_COUNT NUMBER NULL,
OBJECT_VERSION_NUMBER NUMBER NULL,
CREATION_DATE DATE NULL,
CREATED_BY VARCHAR2(200) NULL,
LAST_UPDATE_DATE DATE NULL,
LAST_UPDATED_BY VARCHAR2(200) NULL,
DATA_STATUS VARCHAR2(200) NULL,
JRN_SUBSCRIBER VARCHAR2(50 CHAR) NULL,
JRN_FLAG CHAR(1) NULL,
JRN_DATE DATE NULL
)
NOLOGGING
3. 读取数据,源执行查询取到监听者监听的数据,目标执行插入,将数据插入到目标表
源执行
select
SOURCE.ID AS ID,
SOURCE.HOST AS HOST,
SOURCE.PORT AS PORT,
SOURCE.NAME AS NAME,
SOURCE.APPLICATION AS APPLICATION,
SOURCE.CONTEXT_ROOT AS CONTEXT_ROOT,
SOURCE.SERVICE AS SERVICE,
SOURCE.PATH AS PATH,
SOURCE.DEPLOYMENT_STATE AS DEPLOYMENT_STATE,
SOURCE.SOURCE_INFORMATION AS SOURCE_INFORMATION,
SOURCE.ACTIVE_SERVER_COUNT AS ACTIVE_SERVER_COUNT,
SOURCE.SERVLETS AS SERVLETS,
SOURCE.PENDING_REQUESTS AS PENDING_REQUESTS,
SOURCE.COMPLETED_REQUESTS AS COMPLETED_REQUESTS,
SOURCE.OPEN_SESSIONS_CURRENT_COUNT AS OPEN_SESSIONS_CURRENT_COUNT,
SOURCE.OPEN_SESSIONS_HIGH_COUNT AS OPEN_SESSIONS_HIGH_COUNT,
SOURCE.SESSIONS_OPENED_TOTAL_COUNT AS SESSIONS_OPENED_TOTAL_COUNT,
SOURCE.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER,
SOURCE.CREATION_DATE AS CREATION_DATE,
SOURCE.CREATED_BY AS CREATED_BY,
SOURCE.LAST_UPDATE_DATE AS LAST_UPDATE_DATE,
SOURCE.LAST_UPDATED_BY AS LAST_UPDATED_BY,
SOURCE.DATA_STATUS AS DATA_STATUS,
JRN_SUBSCRIBER AS JRN_SUBSCRIBER,
JRN_FLAG AS JRN_FLAG,
JRN_DATE AS JRN_DATE
from FUSION.JV$DMS_TEST_DELET_SOURCE SOURCE
where (1=1)
AND JRN_SUBSCRIBER = 'Zero' /* AND JRN_DATE < sysdate */
目标执行
insert into FUSION.C$_0SOURCE
(
ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS,
JRN_SUBSCRIBER,
JRN_FLAG,
JRN_DATE
)
values
(
:ID,
:HOST,
:PORT,
:NAME,
:APPLICATION,
:CONTEXT_ROOT,
:SERVICE,
:PATH,
:DEPLOYMENT_STATE,
:SOURCE_INFORMATION,
:ACTIVE_SERVER_COUNT,
:SERVLETS,
:PENDING_REQUESTS,
:COMPLETED_REQUESTS,
:OPEN_SESSIONS_CURRENT_COUNT,
:OPEN_SESSIONS_HIGH_COUNT,
:SESSIONS_OPENED_TOTAL_COUNT,
:OBJECT_VERSION_NUMBER,
:CREATION_DATE,
:CREATED_BY,
:LAST_UPDATE_DATE,
:LAST_UPDATED_BY,
:DATA_STATUS,
:JRN_SUBSCRIBER,
:JRN_FLAG,
:JRN_DATE
)
4.
BEGIN
DBMS_STATS.GATHER_TABLE_STATS (
ownname => 'FUSION',
tabname => 'C$_0SOURCE',
estimate_percent => DBMS_STATS.AUTO_SAMPLE_SIZE
);
END;
5. 清理日记 (这又是什么操作)
null
6. 删除表 FUSION.I$_DMS_TEST_DELET_TARGET
drop table FUSION.I$_DMS_TEST_DELET_TARGET
7. 无日志模式创建表 FUSION.I$_DMS_TEST_DELET_TARGET
create table FUSION.I$_DMS_TEST_DELET_TARGET
(
ID NUMBER NULL,
HOST VARCHAR2(200) NULL,
PORT VARCHAR2(200) NULL,
NAME VARCHAR2(200) NULL,
APPLICATION VARCHAR2(200) NULL,
CONTEXT_ROOT VARCHAR2(200) NULL,
SERVICE VARCHAR2(200) NULL,
PATH VARCHAR2(4000) NULL,
DEPLOYMENT_STATE VARCHAR2(200) NULL,
SOURCE_INFORMATION VARCHAR2(200) NULL,
ACTIVE_SERVER_COUNT NUMBER NULL,
SERVLETS NUMBER NULL,
PENDING_REQUESTS NUMBER NULL,
COMPLETED_REQUESTS NUMBER NULL,
OPEN_SESSIONS_CURRENT_COUNT NUMBER NULL,
OPEN_SESSIONS_HIGH_COUNT NUMBER NULL,
SESSIONS_OPENED_TOTAL_COUNT NUMBER NULL,
OBJECT_VERSION_NUMBER NUMBER NULL,
CREATION_DATE DATE NULL,
CREATED_BY VARCHAR2(200) NULL,
LAST_UPDATE_DATE DATE NULL,
LAST_UPDATED_BY VARCHAR2(200) NULL,
DATA_STATUS VARCHAR2(200) NULL,
IND_UPDATE CHAR(1)
)
NOLOGGING -- 无日志
8. 向 I$_DMS_TEST_DELET_TARGET 表插入数据,数据从 FUSION.C$_0SOURCE 表中取,并去除已存在的数据
/* DETECTION_STRATEGY = NOT_EXISTS */
insert into FUSION.I$_DMS_TEST_DELET_TARGET
(
ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS,
IND_UPDATE
)
select
ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS,
IND_UPDATE
from (
select
SOURCE_A.ID AS ID,
SOURCE_A.HOST AS HOST,
SOURCE_A.PORT AS PORT,
SOURCE_A.NAME AS NAME,
SOURCE_A.APPLICATION AS APPLICATION,
SOURCE_A.CONTEXT_ROOT AS CONTEXT_ROOT,
SOURCE_A.SERVICE AS SERVICE,
SOURCE_A.PATH AS PATH,
SOURCE_A.DEPLOYMENT_STATE AS DEPLOYMENT_STATE,
SOURCE_A.SOURCE_INFORMATION AS SOURCE_INFORMATION,
SOURCE_A.ACTIVE_SERVER_COUNT AS ACTIVE_SERVER_COUNT,
SOURCE_A.SERVLETS AS SERVLETS,
SOURCE_A.PENDING_REQUESTS AS PENDING_REQUESTS,
SOURCE_A.COMPLETED_REQUESTS AS COMPLETED_REQUESTS,
SOURCE_A.OPEN_SESSIONS_CURRENT_COUNT AS OPEN_SESSIONS_CURRENT_COUNT,
SOURCE_A.OPEN_SESSIONS_HIGH_COUNT AS OPEN_SESSIONS_HIGH_COUNT,
SOURCE_A.SESSIONS_OPENED_TOTAL_COUNT AS SESSIONS_OPENED_TOTAL_COUNT,
SOURCE_A.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER,
SOURCE_A.CREATION_DATE AS CREATION_DATE,
SOURCE_A.CREATED_BY AS CREATED_BY,
SOURCE_A.LAST_UPDATE_DATE AS LAST_UPDATE_DATE,
SOURCE_A.LAST_UPDATED_BY AS LAST_UPDATED_BY,
SOURCE_A.DATA_STATUS AS DATA_STATUS,
JRN_FLAG IND_UPDATE
from FUSION.C$_0SOURCE SOURCE_A
where (1=1)
) S
where NOT EXISTS
( select 1 from FUSION.DMS_TEST_DELET_TARGET T
where T.ID = S.ID
and ((T.HOST = S.HOST) or (T.HOST IS NULL and S.HOST IS NULL)) and
((T.PORT = S.PORT) or (T.PORT IS NULL and S.PORT IS NULL)) and
((T.NAME = S.NAME) or (T.NAME IS NULL and S.NAME IS NULL)) and
((T.APPLICATION = S.APPLICATION) or (T.APPLICATION IS NULL and S.APPLICATION IS NULL)) and
((T.CONTEXT_ROOT = S.CONTEXT_ROOT) or (T.CONTEXT_ROOT IS NULL and S.CONTEXT_ROOT IS NULL)) and
((T.SERVICE = S.SERVICE) or (T.SERVICE IS NULL and S.SERVICE IS NULL)) and
((T.PATH = S.PATH) or (T.PATH IS NULL and S.PATH IS NULL)) and
((T.DEPLOYMENT_STATE = S.DEPLOYMENT_STATE) or (T.DEPLOYMENT_STATE IS NULL and S.DEPLOYMENT_STATE IS NULL)) and
((T.SOURCE_INFORMATION = S.SOURCE_INFORMATION) or (T.SOURCE_INFORMATION IS NULL and S.SOURCE_INFORMATION IS NULL)) and
((T.ACTIVE_SERVER_COUNT = S.ACTIVE_SERVER_COUNT) or (T.ACTIVE_SERVER_COUNT IS NULL and S.ACTIVE_SERVER_COUNT IS NULL)) and
((T.SERVLETS = S.SERVLETS) or (T.SERVLETS IS NULL and S.SERVLETS IS NULL)) and
((T.PENDING_REQUESTS = S.PENDING_REQUESTS) or (T.PENDING_REQUESTS IS NULL and S.PENDING_REQUESTS IS NULL)) and
((T.COMPLETED_REQUESTS = S.COMPLETED_REQUESTS) or (T.COMPLETED_REQUESTS IS NULL and S.COMPLETED_REQUESTS IS NULL)) and
((T.OPEN_SESSIONS_CURRENT_COUNT = S.OPEN_SESSIONS_CURRENT_COUNT) or (T.OPEN_SESSIONS_CURRENT_COUNT IS NULL and S.OPEN_SESSIONS_CURRENT_COUNT IS NULL)) and
((T.OPEN_SESSIONS_HIGH_COUNT = S.OPEN_SESSIONS_HIGH_COUNT) or (T.OPEN_SESSIONS_HIGH_COUNT IS NULL and S.OPEN_SESSIONS_HIGH_COUNT IS NULL)) and
((T.SESSIONS_OPENED_TOTAL_COUNT = S.SESSIONS_OPENED_TOTAL_COUNT) or (T.SESSIONS_OPENED_TOTAL_COUNT IS NULL and S.SESSIONS_OPENED_TOTAL_COUNT IS NULL)) and
((T.OBJECT_VERSION_NUMBER = S.OBJECT_VERSION_NUMBER) or (T.OBJECT_VERSION_NUMBER IS NULL and S.OBJECT_VERSION_NUMBER IS NULL)) and
((T.CREATION_DATE = S.CREATION_DATE) or (T.CREATION_DATE IS NULL and S.CREATION_DATE IS NULL)) and
((T.CREATED_BY = S.CREATED_BY) or (T.CREATED_BY IS NULL and S.CREATED_BY IS NULL)) and
((T.LAST_UPDATE_DATE = S.LAST_UPDATE_DATE) or (T.LAST_UPDATE_DATE IS NULL and S.LAST_UPDATE_DATE IS NULL)) and
((T.LAST_UPDATED_BY = S.LAST_UPDATED_BY) or (T.LAST_UPDATED_BY IS NULL and S.LAST_UPDATED_BY IS NULL)) and
((T.DATA_STATUS = S.DATA_STATUS) or (T.DATA_STATUS IS NULL and S.DATA_STATUS IS NULL))
)
9. 创建索引
create index FUSION.I$_DMS_TEST_DELET_TARGET_UK
on FUSION.I$_DMS_TEST_DELET_TARGET (ID)
NOLOGGING
10.
begin
dbms_stats.gather_table_stats(
ownname => 'FUSION',
tabname => 'I$_DMS_TEST_DELET_TARGET',
estimate_percent => dbms_stats.auto_sample_size
);
end;
11. 删除目标表标识符为 D 的
delete from FUSION.DMS_TEST_DELET_TARGET
where exists (
select 'X'
from FUSION.I$_DMS_TEST_DELET_TARGET I
where FUSION.DMS_TEST_DELET_TARGET.ID = I.ID
and IND_UPDATE = 'D'
)
12. 删除临时插入表中标识符为 D 的
delete from FUSION.I$_DMS_TEST_DELET_TARGET
where IND_UPDATE = 'D'
13. 创建校验表
create table FUSION.SNP_CHECK_TAB
(CATALOG_NAME VARCHAR2(100 CHAR) NULL ,
SCHEMA_NAME VARCHAR2(100 CHAR) NULL ,
RESOURCE_NAME VARCHAR2(100 CHAR) NULL,
FULL_RES_NAME VARCHAR2(100 CHAR) NULL,
ERR_TYPE VARCHAR2(1 CHAR) NULL,
ERR_MESS VARCHAR2(250 CHAR) NULL ,
CHECK_DATE DATE NULL,
ORIGIN VARCHAR2(250 CHAR) NULL,
CONS_NAME VARCHAR2(128 CHAR) NULL,
CONS_TYPE VARCHAR2(2 CHAR) NULL,
ERR_COUNT NUMBER(10) NULL
)
14. 删除 FUSION.SNP_CHECK_TAB 中数据
delete from FUSION.SNP_CHECK_TAB 中数据
where SCHEMA_NAME = 'FUSION'
and ORIGIN = '(91)mdsProject.DemoTestUpdateDate'
and ERR_TYPE = 'F'
15. 创建错误表
create table FUSION.E$_DMS_TEST_DELET_TARGET
(
ODI_ROW_ID UROWID,
ODI_ERR_TYPE VARCHAR2(1 CHAR) NULL,
ODI_ERR_MESS VARCHAR2(250 CHAR) NULL,
ODI_CHECK_DATE DATE NULL,
ID NUMBER NULL,
HOST VARCHAR2(200) NULL,
PORT VARCHAR2(200) NULL,
NAME VARCHAR2(200) NULL,
APPLICATION VARCHAR2(200) NULL,
CONTEXT_ROOT VARCHAR2(200) NULL,
SERVICE VARCHAR2(200) NULL,
PATH VARCHAR2(4000) NULL,
DEPLOYMENT_STATE VARCHAR2(200) NULL,
SOURCE_INFORMATION VARCHAR2(200) NULL,
ACTIVE_SERVER_COUNT NUMBER NULL,
SERVLETS NUMBER NULL,
PENDING_REQUESTS NUMBER NULL,
COMPLETED_REQUESTS NUMBER NULL,
OPEN_SESSIONS_CURRENT_COUNT NUMBER NULL,
OPEN_SESSIONS_HIGH_COUNT NUMBER NULL,
SESSIONS_OPENED_TOTAL_COUNT NUMBER NULL,
OBJECT_VERSION_NUMBER NUMBER NULL,
CREATION_DATE DATE NULL,
CREATED_BY VARCHAR2(200) NULL,
LAST_UPDATE_DATE DATE NULL,
LAST_UPDATED_BY VARCHAR2(200) NULL,
DATA_STATUS VARCHAR2(200) NULL,
ODI_ORIGIN VARCHAR2(250 CHAR) NULL,
ODI_CONS_NAME VARCHAR2(128 CHAR) NULL,
ODI_CONS_TYPE VARCHAR2(2 CHAR) NULL,
ODI_PK VARCHAR2(32 CHAR) PRIMARY KEY,
ODI_SESS_NO VARCHAR2(36 CHAR)
)
16. 删除错误表内上次数据
delete from FUSION.E$_DMS_TEST_DELET_TARGET
where (ODI_ERR_TYPE = 'S' and 'F' = 'S')
or (ODI_ERR_TYPE = 'F' and ODI_ORIGIN = '(91)mdsProject.DemoTestUpdateDate')
17. 创建索引
/* FLOW CONTROL CREATE THE INDEX ON I$TABLE */
create index FUSION.I$_DMS_TEST_DELET_TARGET_PK
on FUSION.I$_DMS_TEST_DELET_TARGET (ID)
18. 向表 FUSION.E$_DMS_TEST_DELET_TARGET 中插入 主键不唯一错误的数据
insert into FUSION.E$_DMS_TEST_DELET_TARGET
(
ODI_PK,
ODI_SESS_NO,
ODI_ROW_ID,
ODI_ERR_TYPE,
ODI_ERR_MESS,
ODI_ORIGIN,
ODI_CHECK_DATE,
ODI_CONS_NAME,
ODI_CONS_TYPE,
ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS
)
select SYS_GUID(),
'28d4ac47-7a0a-408b-b050-9631a3fe5510',
rowid,
'F',
'ODI-15064: 主键 DMS_TEST_DELET_TARGET_PK 不是唯一的。',
'(91)mdsProject.DemoTestUpdateDate',
sysdate,
'DMS_TEST_DELET_TARGET_PK',
'PK',
TARGET.ID,
TARGET.HOST,
TARGET.PORT,
TARGET.NAME,
TARGET.APPLICATION,
TARGET.CONTEXT_ROOT,
TARGET.SERVICE,
TARGET.PATH,
TARGET.DEPLOYMENT_STATE,
TARGET.SOURCE_INFORMATION,
TARGET.ACTIVE_SERVER_COUNT,
TARGET.SERVLETS,
TARGET.PENDING_REQUESTS,
TARGET.COMPLETED_REQUESTS,
TARGET.OPEN_SESSIONS_CURRENT_COUNT,
TARGET.OPEN_SESSIONS_HIGH_COUNT,
TARGET.SESSIONS_OPENED_TOTAL_COUNT,
TARGET.OBJECT_VERSION_NUMBER,
TARGET.CREATION_DATE,
TARGET.CREATED_BY,
TARGET.LAST_UPDATE_DATE,
TARGET.LAST_UPDATED_BY,
TARGET.DATA_STATUS
from FUSION.I$_DMS_TEST_DELET_TARGET TARGET
where exists (
select SUB.ID
from FUSION.I$_DMS_TEST_DELET_TARGET SUB
where SUB.ID=TARGET.ID
group by SUB.ID
having count(1) > 1
)-- 检查主键重复
19. 向表 FUSION.E$_DMS_TEST_DELET_TARGET 中插入 主键为 null 错误的数据
insert into FUSION.E$_DMS_TEST_DELET_TARGET
(
ODI_PK,
ODI_SESS_NO,
ODI_ROW_ID,
ODI_ERR_TYPE,
ODI_ERR_MESS,
ODI_CHECK_DATE,
ODI_ORIGIN,
ODI_CONS_NAME,
ODI_CONS_TYPE,
ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS
)
select
SYS_GUID(),
'28d4ac47-7a0a-408b-b050-9631a3fe5510',
rowid,
'F',
'ODI-15066: 列 ID 不能为空值。',
sysdate,
'(91)mdsProject.DemoTestUpdateDate',
'ID',
'NN',
ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS
from FUSION.I$_DMS_TEST_DELET_TARGET
where ID is null
20. 为 FUSION.E$_DMS_TEST_DELET_TARGET 创建索引
/* FLOW CONTROL CREATE INDEX ON THE E$TABLE */
create index FUSION.E$_DMS_TEST_DELET_TARGET_IDX
on FUSION.E$_DMS_TEST_DELET_TARGET (ODI_ROW_ID)
21. 从插入表中删除错误行(错误表中存在的行)
delete from FUSION.I$_DMS_TEST_DELET_TARGET T
where exists (
select 1
from FUSION.E$_DMS_TEST_DELET_TARGET E
where ODI_SESS_NO = '28d4ac47-7a0a-408b-b050-9631a3fe5510'
and T.rowid = E.ODI_ROW_ID
)
22. 向 FUSION.SNP_CHECK_TAB 中插入数据
insert into FUSION.SNP_CHECK_TAB 中插入数据
(
SCHEMA_NAME,
RESOURCE_NAME,
FULL_RES_NAME,
ERR_TYPE,
ERR_MESS,
CHECK_DATE,
ORIGIN,
CONS_NAME,
CONS_TYPE,
ERR_COUNT
)
select
'FUSION',
'DMS_TEST_DELET_TARGET',
'FUSION.DMS_TEST_DELET_TARGET',
E.ODI_ERR_TYPE,
E.ODI_ERR_MESS,
E.ODI_CHECK_DATE,
E.ODI_ORIGIN,
E.ODI_CONS_NAME,
E.ODI_CONS_TYPE,
count(1)
from FUSION.E$_DMS_TEST_DELET_TARGET E
where E.ODI_ERR_TYPE = 'F'
and E.ODI_ORIGIN = '(91)mdsProject.DemoTestUpdateDate'
group by E.ODI_ERR_TYPE,
E.ODI_ERR_MESS,
E.ODI_CHECK_DATE,
E.ODI_ORIGIN,
E.ODI_CONS_NAME,
E.ODI_CONS_TYPE
23. 更新 FUSION.I$_DMS_TEST_DELET_TARGET 中 ID 在 FUSION.DMS_TEST_DELET_TARGET 中已存在的数据标识符为 U(Update),后面做更新处理
/* DETECTION_STRATEGY = NOT_EXISTS */
update FUSION.I$_DMS_TEST_DELET_TARGET
set IND_UPDATE = 'U'
where (ID)
in (
select ID
from FUSION.DMS_TEST_DELET_TARGET
)
24. 标识无效行
/* DETECTION_STRATEGY = NOT_EXISTS */
/* Command skiped due to chosen DETECTION_STRATEGY */
25.Disable constraints
26.Disable indexes
27. 更新已存在的行
/* DETECTION_STRATEGY = NOT_EXISTS */
update FUSION.DMS_TEST_DELET_TARGET T
set (
T.HOST,
T.PORT,
T.NAME,
T.APPLICATION,
T.CONTEXT_ROOT,
T.SERVICE,
T.PATH,
T.DEPLOYMENT_STATE,
T.SOURCE_INFORMATION,
T.ACTIVE_SERVER_COUNT,
T.SERVLETS,
T.PENDING_REQUESTS,
T.COMPLETED_REQUESTS,
T.OPEN_SESSIONS_CURRENT_COUNT,
T.OPEN_SESSIONS_HIGH_COUNT,
T.SESSIONS_OPENED_TOTAL_COUNT,
T.OBJECT_VERSION_NUMBER,
T.CREATION_DATE,
T.CREATED_BY,
T.LAST_UPDATE_DATE,
T.LAST_UPDATED_BY,
T.DATA_STATUS
) =
(
select S.HOST,
S.PORT,
S.NAME,
S.APPLICATION,
S.CONTEXT_ROOT,
S.SERVICE,
S.PATH,
S.DEPLOYMENT_STATE,
S.SOURCE_INFORMATION,
S.ACTIVE_SERVER_COUNT,
S.SERVLETS,
S.PENDING_REQUESTS,
S.COMPLETED_REQUESTS,
S.OPEN_SESSIONS_CURRENT_COUNT,
S.OPEN_SESSIONS_HIGH_COUNT,
S.SESSIONS_OPENED_TOTAL_COUNT,
S.OBJECT_VERSION_NUMBER,
S.CREATION_DATE,
S.CREATED_BY,
S.LAST_UPDATE_DATE,
S.LAST_UPDATED_BY,
S.DATA_STATUS
from FUSION.I$_DMS_TEST_DELET_TARGET S
where T.ID =S.ID
)
where (ID)
in (
select ID
from FUSION.I$_DMS_TEST_DELET_TARGET
where IND_UPDATE = 'U'
)
28. 插入不存在的行
/* DETECTION_STRATEGY = NOT_EXISTS */
insert into FUSION.DMS_TEST_DELET_TARGET T
(
ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS
)
select ID,
HOST,
PORT,
NAME,
APPLICATION,
CONTEXT_ROOT,
SERVICE,
PATH,
DEPLOYMENT_STATE,
SOURCE_INFORMATION,
ACTIVE_SERVER_COUNT,
SERVLETS,
PENDING_REQUESTS,
COMPLETED_REQUESTS,
OPEN_SESSIONS_CURRENT_COUNT,
OPEN_SESSIONS_HIGH_COUNT,
SESSIONS_OPENED_TOTAL_COUNT,
OBJECT_VERSION_NUMBER,
CREATION_DATE,
CREATED_BY,
LAST_UPDATE_DATE,
LAST_UPDATED_BY,
DATA_STATUS
from FUSION.I$_DMS_TEST_DELET_TARGET S
where IND_UPDATE = 'I'
29.Enable Index
30.Enable constraints
31. 提交事务
/*commit*/
32. 删除工作表
drop table FUSION.C$_0SOURCE purge
33. 删除插入表
drop table FUSION.I$_DMS_TEST_DELET_TARGET 删除插入表
取消锁定订户
1. 检验版本
DECLARE -- Validating KM options
dbVersion1 NUMBER(2,0);
dbVersion2 NUMBER(2,0);
BEGIN
-- Verify RDBMS version
select
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, 1, instr(value, '.',1,1)-1)) else null end v0,
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, instr(value, '.',1,1)+1, instr(value, '.',1,2)-instr(value, '.',1,1)-1)) else null end v1
into dbVersion1, dbVersion2
from v$parameter
where upper(name) = 'COMPATIBLE';
IF dbVersion1 < 9 THEN
raise_application_error(-20101, 'ODIKM-ORA-10000: You are using RDBMS version'||dbVersion1||'.'||dbVersion2||'. This version does not support JKM COMPATIBLE= 9.');
END IF;
END;
2. 设置参数
import java.sql as sql
import java.lang as lang
# Connect to the Source Database ie the one that hosts the journalized tables
myCon = odiRef.getJDBCConnection("SRC")
# Create the list of Subscribers for a CDC Set
lstSbs = []
lstSbs.append('Zero')
3.
updCmd = """
update FUSION.SNP_CDC_SUBS SUBS
set SUBS.MIN_WINDOW_ID = SUBS.MAX_WINDOW_ID_INS
where SUBS.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
and SUBS.CDC_SUBSCRIBER = ?
"""
# Prepare the update
prepUpdStmt = myCon.prepareStatement(updCmd)
# Execute it for each subscriber
for sbs in lstSbs:
prepUpdStmt.setString(1, sbs)
nbRows = prepUpdStmt.executeUpdate()
prepUpdStmt.close()
# Commit the updates
myCon.commit()
4. 关闭连接
myCon.close()
清除日记
1. 检验版本
DECLARE -- Validating KM options
dbVersion1 NUMBER(2,0);
dbVersion2 NUMBER(2,0);
BEGIN
-- Verify RDBMS version
select
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, 1, instr(value, '.',1,1)-1)) else null end v0,
case when upper(name) = 'COMPATIBLE' then to_number(substr(value, instr(value, '.',1,1)+1, instr(value, '.',1,2)-instr(value, '.',1,1)-1)) else null end v1
into dbVersion1, dbVersion2
from v$parameter
where upper(name) = 'COMPATIBLE';
IF dbVersion1 < 9 THEN
raise_application_error(-20101, 'ODIKM-ORA-10000: You are using RDBMS version'||dbVersion1||'.'||dbVersion2||'. This version does not support JKM COMPATIBLE= 9.');
END IF;
END;
2. 删除已同步的数据
/* Delete rows from journalizing tables that are useless for each subscriber */
delete from FUSION.J$DMS_TEST_DELET_SOURCE JRN
where JRN.WINDOW_ID <= (select min(SUBS.MIN_WINDOW_ID)
from FUSION.SNP_CDC_SUBS SUBS
where SUBS.CDC_SET_NAME = 'FUSION.FUSIONTABLEUPDATEDEMO'
)
3. 关闭连接 一次 CDC 同步结束
myCon.close()