Oracle Streams 11g Configuration (Schema Replication) Procedures
Prerequisite:
1. Create Tablespace for Oracle Streams administrator schema on both databases (SRC & DEST)
(which stores any objects created in the Oracle Streams administrator schema, including any spillover of messages from the buffered queues ownered by the schema. )
=> create tablespace streams_tbs datafile '+DATA' size 50M autoextend on next 10M maxsize unlimited default compress;
2. Create a new user to acts as the Oracle Streams administrator on both databases (SRC & DEST).
create user strmadmin identified by xxx
default tablespace streams_tbs quota unlimited on streams_tbs
temporary tablespace temp;
grant dba to strmadmin;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(grantee => 'strmadmin', grant_privileges => TRUE);
3. Add Net Service Names (if not exists) on both databases.
tnsnames.ora
>>
SRC =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.1.100)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = SRC)
)
)
DEST =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.1.200)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = DEST)
)
)
4. Create Database Links and Data Pump Directories:
[Source DB]
create database link DEST connect to strmadmin identified by xxx using 'DEST';
create directory SRC_DIR as '/data_pump/SRC/';
[Destination DB]
create database link SRC connect to strmadmin identified by xxx using 'SRC';
create directory DESTC_DIR as '/data_pump/DEST/';
5. Create relevant tablespaces which store objects owned by the replication schema (e.g. SCOTT)
in the DEST db.
6. Set the STREAMS_POOL_SIZE initialization paraemeter appropriately according to your db environment (e.g. 160M).
If both the STREAMS_POOL_SIZE and the SGA_TARGET are set to 0 (zero), then, by default, the first use of Streams in a database
trafers an amount of memory equal to 10% of the shared pool from the buffer cache to the Stream pool.
7. In DEST db, set the service_names = 'DEST,SYS$STRMADMIN.SRC_APPQ.DEST' to avoid TNS-12564: TNS:connection refused
error.
Execute Configuration SQL script (setup.sql)
setup.sql is shown as follows:
PROMPT db1: Source Database - SRC
PROMPT db2: Destination Database - DEST
PROMPT While executing the script command line make sure to create Database Links
--------------------------------------------------------------------------------------
-- get TNSNAME and streams admin user details for both the databases
--------------------------------------------------------------------------------------
PROMPT
PROMPT 'Enter TNS Name of site 1 (Source DB) as parameter 1:'
DEFINE db1 = &1
PROMPT
PROMPT 'Enter streams admin username for site 1 (Source DB) as parameter 2:'
DEFINE strm_adm_db1 = &2
PROMPT
PROMPT 'Enter streams admin password for site 1 (Source DB) as parameter 3:'
DEFINE strm_adm_pwd_db1 = &3
PROMPT
PROMPT 'Enter TNS Name of site 2 (DEST DB) as parameter 4:'
DEFINE db2 = &4
PROMPT
PROMPT 'Enter streams admin username for site 2 (DEST DB) as parameter 5:'
DEFINE strm_adm_db2 = &5
PROMPT
PROMPT 'Enter streams admin password for site 2 (DEST DB) as parameter 6:'
DEFINE strm_adm_pwd_db2 = &6
-- connect as streams administrator to site 1 (Source DB)
PROMPT Connecting as streams administrator to site 1 (Source DB)
CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1
--
-- Set up queue "STRMADMIN"."SRC_CAPQ"
--
BEGIN
dbms_streams_adm.set_up_queue(
queue_table => '"STRMADMIN"."SRC_CAPQT"',
storage_clause => NULL,
queue_name => '"STRMADMIN"."SRC_CAPQ"',
queue_user => '"STRMADMIN"');
END;
/
--
-- PROPAGATE changes for schema SCOTT
--
DECLARE
version_num NUMBER := 0;
release_num NUMBER := 0;
pos NUMBER;
initpos NUMBER;
q2q BOOLEAN;
stmt VARCHAR2(300);
ver VARCHAR2(30);
compat VARCHAR2(30);
BEGIN
BEGIN
stmt := 'BEGIN dbms_utility.db_version@"DEST"(:ver, :compat); END;';
EXECUTE IMMEDIATE stmt USING OUT ver, OUT compat;
-- Extract version number
initpos := 1;
pos := INSTR(compat, '.', initpos, 1);
IF pos > 0 THEN
version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
initpos := pos + 1;
-- Extract release number
pos := INSTR(compat, '.', initpos, 1);
IF pos > 0 THEN
release_num := TO_NUMBER(SUBSTR(compat, initpos,
pos - initpos));
initpos := pos + 1;
ELSE
release_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
ELSE
version_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
-- use q2q propagation if compatibility >= 10.2
IF version_num > 10 OR
(version_num = 10 AND release_num >=2) THEN
q2q := TRUE;
ELSE
q2q := FALSE;
END IF;
EXCEPTION WHEN OTHERS THEN
q2q := FALSE;
END;
dbms_streams_adm.add_schema_propagation_rules(
schema_name => '"SCOTT"',
streams_name => '',
source_queue_name => '"STRMADMIN"."SRC_CAPQ"',
destination_queue_name => '"STRMADMIN"."SRC_APPQ"@DEST',
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => 'SRC',
inclusion_rule => TRUE,
and_condition => NULL,
queue_to_queue => q2q);
END;
/
--
-- Disable propagation. Enable after destination has been setup
--
DECLARE
q2q VARCHAR2(10);
destn_q VARCHAR2(65);
BEGIN
SELECT queue_to_queue INTO q2q
FROM dba_propagation
WHERE source_queue_owner = 'STRMADMIN' AND
source_queue_name = 'SRC_CAPQ' AND
destination_queue_owner = 'STRMADMIN' AND
destination_queue_name = 'SRC_APPQ' AND
destination_dblink = 'DEST';
IF q2q = 'TRUE' THEN
destn_q := '"STRMADMIN"."SRC_APPQ"';
ELSE
destn_q := NULL;
END IF;
dbms_aqadm.disable_propagation_schedule(
queue_name => '"STRMADMIN"."SRC_CAPQ"',
destination => 'DEST',
destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -24065 THEN NULL; -- propagation already disabled
ELSE RAISE;
END IF;
END;
/
--
-- CAPTURE changes for schema SCOTT
--
DECLARE
compat VARCHAR2(512);
initpos NUMBER;
pos NUMBER;
version_num NUMBER;
release_num NUMBER;
compat_func VARCHAR2(65);
get_compatible VARCHAR2(4000);
BEGIN
SELECT value INTO compat
FROM v$parameter
WHERE name = 'compatible';
-- Extract version number
initpos := 1;
pos := INSTR(compat, '.', initpos, 1);
IF pos > 0 THEN
version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
initpos := pos + 1;
-- Extract release number
pos := INSTR(compat, '.', initpos, 1);
IF pos > 0 THEN
release_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
initpos := pos + 1;
ELSE
release_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
END IF;
IF version_num < 10 THEN
compat_func := 'dbms_streams.compatible_9_2';
ELSIF version_num = 10 THEN
IF release_num < 2 THEN
compat_func := 'dbms_streams.compatible_10_1';
ELSE
compat_func := 'dbms_streams.compatible_10_2';
END IF;
ELSIF version_num = 11 THEN
IF release_num < 2 THEN
compat_func := 'dbms_streams.compatible_11_1';
ELSE
compat_func := 'dbms_streams.compatible_11_2';
END IF;
ELSE
compat_func := 'dbms_streams.compatible_11_2';
END IF;
get_compatible := ':lcr.get_compatible() <= '||compat_func;
dbms_streams_adm.add_schema_rules(
schema_name => '"SCOTT"',
streams_type => 'CAPTURE',
streams_name => '"SRC$CAP"',
queue_name => '"STRMADMIN"."SRC_CAPQ"',
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => 'SRC',
inclusion_rule => TRUE,
and_condition => get_compatible);
END;
/
--
-- Datapump SCHEMA MODE EXPORT
--
DECLARE
h1 NUMBER := NULL; -- data pump job handle
schema_expr_list VARCHAR2(32767); -- for metadata_filter
cnt NUMBER; -- temp variable
object_owner dbms_utility.uncl_array; -- obj owners
job_state VARCHAR2(30); -- job state
status ku$_Status; -- data pump status
job_not_exist exception;
pragma exception_init(job_not_exist, -31626);
local_compat v$parameter.value%TYPE;
remote_compat v$parameter.value%TYPE;
min_compat v$parameter.value%TYPE;
ind NUMBER;
le ku$_LogEntry; -- For WIP and error messages
js ku$_JobStatus; -- The job status from get_status
jd ku$_JobDesc;
-- The job description from get_status
BEGIN
object_owner(1) := 'SCOTT';
FOR idx IN 1..1 LOOP
-- schema does not exist locally, need instantiation
IF schema_expr_list IS NULL THEN
schema_expr_list := '(';
ELSE
schema_expr_list := schema_expr_list ||',';
END IF;
schema_expr_list := schema_expr_list||''''||object_owner(idx)||'''';
END LOOP;
IF schema_expr_list IS NOT NULL THEN
schema_expr_list := schema_expr_list || ')';
ELSE
COMMIT;
RETURN;
END IF;
select value into local_compat from v$parameter@"SRC"
where name = 'compatible';
select value into remote_compat from v$parameter@"DEST"
where name = 'compatible';
IF TO_NUMBER(REPLACE(local_compat, '.', '0')) > TO_NUMBER(REPLACE(remote_compat, '.', '0')) THEN
min_compat := remote_compat;
ELSE
min_compat := local_compat;
END IF;
h1 := dbms_datapump.open(operation=>'EXPORT',job_mode=>'SCHEMA',
remote_link=>'',
job_name=>NULL, version=> min_compat);
dbms_datapump.metadata_filter(
handle=>h1,
name=>'SCHEMA_EXPR',
value=>'IN'||schema_expr_list);
dbms_datapump.add_file(
handle=>h1,
filename=>'streams_setup_2015_3_19_10_00_00_001.dmp',
directory=>'SRC_DIR',
filetype=>dbms_datapump.ku$_file_type_dump_file);
dbms_datapump.add_file(
handle=>h1,
filename=>'streams_setup_2015_3_19_10_00_00_001.log',
directory=>'SRC_DIR',
filetype=>dbms_datapump.ku$_file_type_log_file);
dbms_datapump.start_job(h1);
job_state := 'UNDEFINED';
BEGIN
WHILE (job_state != 'COMPLETED') AND (job_state != 'STOPPED') LOOP
status := dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1);
job_state := status.job_status.state;
dbms_lock.sleep(10);
END LOOP;
EXCEPTION WHEN job_not_exist THEN
dbms_output.put_line('job finished');
END;
-- Transfer dump file to the destination directory
dbms_file_transfer.put_file(
source_directory_object => '"SRC_DIR"',
source_file_name => 'streams_setup_2015_3_19_10_00_00_001.dmp',
destination_directory_object => '"DEST_DIR"',
destination_file_name => 'streams_setup_2015_3_19_10_00_00_001.dmp',
destination_database => 'DEST');
COMMIT;
EXCEPTION WHEN OTHERS THEN
dbms_output.put_line('Exception, sql code = ' || SQLCODE || ', error message: ' || SQLERRM);
dbms_output.put_line( dbms_utility.format_error_stack);
dbms_output.put_line( dbms_utility.format_error_backtrace);
IF h1 IS NOT NULL THEN
BEGIN
dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1,
job_state => job_state,
status => status );
dbms_output.put_line('Data pump job status: ' || job_state);
le := status.wip;
IF le IS NULL THEN
dbms_output.put_line('WIP info is NULL');
ELSE
dbms_output.put_line('WIP info:');
ind := le.FIRST;
WHILE ind IS NOT NULL LOOP
dbms_output.put_line(le(ind).LogText);
ind := le.NEXT(ind);
END LOOP;
END IF;
le := status.error;
IF le IS NULL THEN
dbms_output.put_line('Error info is NULL');
ELSE
dbms_output.put_line('Error info:');
ind := le.FIRST;
WHILE ind IS NOT NULL LOOP
dbms_output.put_line(le(ind).LogText);
ind := le.NEXT(ind);
END LOOP;
END IF;
EXCEPTION
WHEN job_not_exist THEN
dbms_output.put_line('Data pump job finished');
WHEN OTHERS THEN RAISE;
END;
END IF;
ROLLBACK;
RAISE;
END;
/
-- connect as streams administrator to site 2
PROMPT Connecting as streams administrator to site 2
CONNECT &strm_adm_db2/&strm_adm_pwd_db2@&db2
--
-- Datapump SCHEMA MODE IMPORT
--
DECLARE
h1 NUMBER := NULL; -- data pump job handle
schema_expr_list VARCHAR2(32767); -- for metadata_filter
cnt NUMBER; -- temp variable
object_owner dbms_utility.uncl_array; -- obj owners
job_state VARCHAR2(30); -- job state
status ku$_Status; -- data pump status
job_not_exist exception;
pragma exception_init(job_not_exist, -31626);
local_compat v$parameter.value%TYPE;
remote_compat v$parameter.value%TYPE;
min_compat v$parameter.value%TYPE;
ind NUMBER;
le ku$_LogEntry; -- For WIP and error messages
js ku$_JobStatus; -- The job status from get_status
jd ku$_JobDesc;
-- The job description from get_status
BEGIN
object_owner(1) := 'SCOTT';
FOR idx IN 1..1 LOOP
-- schema does not exist locally, need instantiation
IF schema_expr_list IS NULL THEN
schema_expr_list := '(';
ELSE
schema_expr_list := schema_expr_list ||',';
END IF;
schema_expr_list := schema_expr_list||''''||object_owner(idx)||'''';
END LOOP;
IF schema_expr_list IS NOT NULL THEN
schema_expr_list := schema_expr_list || ')';
ELSE
COMMIT;
RETURN;
END IF;
select value into local_compat from v$parameter@"SRC"
where name = 'compatible';
select value into remote_compat from v$parameter@"DEST"
where name = 'compatible';
IF TO_NUMBER(REPLACE(local_compat, '.', '0')) > TO_NUMBER(REPLACE(remote_compat, '.', '0')) THEN
min_compat := remote_compat;
ELSE
min_compat := local_compat;
END IF;
h1 := dbms_datapump.open(operation=>'IMPORT',job_mode=>'SCHEMA',
remote_link=>'',
job_name=>NULL, version=> min_compat);
dbms_datapump.add_file(
handle=>h1,
filename=>'streams_setup_2015_3_19_10_00_00_001.dmp',
directory=>'DEST_DIR',
filetype=>dbms_datapump.ku$_file_type_dump_file);
dbms_datapump.add_file(
handle=>h1,
filename=>'streams_setup_2015_3_19_10_00_00_001.log',
directory=>'DEST_DIR',
filetype=>dbms_datapump.ku$_file_type_log_file);
dbms_datapump.start_job(h1);
job_state := 'UNDEFINED';
BEGIN
WHILE (job_state != 'COMPLETED') AND (job_state != 'STOPPED') LOOP
status := dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1);
job_state := status.job_status.state;
dbms_lock.sleep(10);
END LOOP;
EXCEPTION WHEN job_not_exist THEN
dbms_output.put_line('job finished');
END;
COMMIT;
EXCEPTION WHEN OTHERS THEN
dbms_output.put_line('Exception, sql code = ' || SQLCODE || ', error message: ' || SQLERRM);
dbms_output.put_line( dbms_utility.format_error_stack);
dbms_output.put_line( dbms_utility.format_error_backtrace);
IF h1 IS NOT NULL THEN
BEGIN
dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1,
job_state => job_state,
status => status );
dbms_output.put_line('Data pump job status: ' || job_state);
le := status.wip;
IF le IS NULL THEN
dbms_output.put_line('WIP info is NULL');
ELSE
dbms_output.put_line('WIP info:');
ind := le.FIRST;
WHILE ind IS NOT NULL LOOP
dbms_output.put_line(le(ind).LogText);
ind := le.NEXT(ind);
END LOOP;
END IF;
le := status.error;
IF le IS NULL THEN
dbms_output.put_line('Error info is NULL');
ELSE
dbms_output.put_line('Error info:');
ind := le.FIRST;
WHILE ind IS NOT NULL LOOP
dbms_output.put_line(le(ind).LogText);
ind := le.NEXT(ind);
END LOOP;
END IF;
EXCEPTION
WHEN job_not_exist THEN
dbms_output.put_line('Data pump job finished');
WHEN OTHERS THEN RAISE;
END;
END IF;
ROLLBACK;
RAISE;
END;
/
--
-- Set up queue "STRMADMIN"."SRC_APPQ"
--
BEGIN
dbms_streams_adm.set_up_queue(
queue_table => '"STRMADMIN"."SRC_APPQT"',
storage_clause => NULL,
queue_name => '"STRMADMIN"."SRC_APPQ"',
queue_user => '"STRMADMIN"');
END;
/
--
-- APPLY changes for schema SCOTT
--
DECLARE
compat VARCHAR2(512);
initpos NUMBER;
pos NUMBER;
version_num NUMBER;
release_num NUMBER;
compat_func VARCHAR2(65);
get_compatible VARCHAR2(4000);
BEGIN
SELECT value INTO compat
FROM v$parameter
WHERE name = 'compatible';
-- Extract version number
initpos := 1;
pos := INSTR(compat, '.', initpos, 1);
IF pos > 0 THEN
version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
initpos := pos + 1;
-- Extract release number
pos := INSTR(compat, '.', initpos, 1);
IF pos > 0 THEN
release_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
initpos := pos + 1;
ELSE
release_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
END IF;
IF version_num < 10 THEN
compat_func := 'dbms_streams.compatible_9_2';
ELSIF version_num = 10 THEN
IF release_num < 2 THEN
compat_func := 'dbms_streams.compatible_10_1';
ELSE
compat_func := 'dbms_streams.compatible_10_2';
END IF;
ELSIF version_num = 11 THEN
IF release_num < 2 THEN
compat_func := 'dbms_streams.compatible_11_1';
ELSE
compat_func := 'dbms_streams.compatible_11_2';
END IF;
ELSE
compat_func := 'dbms_streams.compatible_11_2';
END IF;
get_compatible := ':lcr.get_compatible() <= '||compat_func;
dbms_streams_adm.add_schema_rules(
schema_name => '"SCOTT"',
streams_type => 'APPLY',
streams_name => '',
queue_name => '"STRMADMIN"."SRC_APPQ"',
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => 'SRC',
inclusion_rule => TRUE,
and_condition => get_compatible);
END;
/
--
-- Get tag value to be used for Apply
--
DECLARE
found BINARY_INTEGER := 0;
tag_num NUMBER;
apply_nm VARCHAR2(30);
apply_nm_dqt VARCHAR2(32);
BEGIN
SELECT apply_name INTO apply_nm
FROM dba_apply_progress
WHERE source_database = 'SRC';
apply_nm_dqt := '"' || apply_nm || '"';
-- Use the apply object id as the tag
SELECT o.object_id INTO tag_num
FROM dba_objects o
WHERE o.object_name= apply_nm AND
o.object_type='APPLY';
LOOP
BEGIN
found := 0;
SELECT 1 INTO found FROM dba_apply
WHERE apply_name != apply_nm AND
apply_tag = hextoraw(tag_num);
EXCEPTION WHEN no_data_found THEN
EXIT;
END;
EXIT WHEN (found = 0);
tag_num := tag_num + 1;
END LOOP;
-- alter apply
dbms_apply_adm.alter_apply(
apply_name => apply_nm_dqt,
apply_tag => hextoraw(tag_num));
END;
/
--
-- Start apply process applying changes from SRC
--
DECLARE
apply_nm VARCHAR2(32);
apply_nm_dqt VARCHAR2(32);
BEGIN
SELECT apply_name INTO apply_nm
FROM dba_apply_progress
WHERE source_database = 'SRC';
apply_nm_dqt := '"' || apply_nm || '"';
dbms_apply_adm.start_apply(
apply_name => apply_nm_dqt);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; -- APPLY process already running
ELSE RAISE;
END IF;
END;
/
-- connect as streams administrator to site 1
PROMPT Connecting as streams administrator to site 1
CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1
--
-- Enable propagation schedule for "STRMADMIN"."SRC_CAPQ"
-- to DEST
--
DECLARE
q2q VARCHAR2(10);
destn_q VARCHAR2(65);
BEGIN
SELECT queue_to_queue INTO q2q
FROM dba_propagation
WHERE source_queue_owner = 'STRMADMIN' AND
source_queue_name = 'SRC_CAPQ' AND
destination_queue_owner = 'STRMADMIN' AND
destination_queue_name = 'SRC_APPQ' AND
destination_dblink = 'DEST';
IF q2q = 'TRUE' THEN
destn_q := '"STRMADMIN"."SRC_APPQ"';
ELSE
destn_q := NULL;
END IF;
dbms_aqadm.enable_propagation_schedule(
queue_name => '"STRMADMIN"."SRC_CAPQ"',
destination => 'DEST',
destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -24064 THEN NULL; -- propagation already enabled
ELSE RAISE;
END IF;
END;
/
--
-- Start capture process SRC$CAP
--
BEGIN
dbms_capture_adm.start_capture(
capture_name => '"SRC$CAP"');
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; -- CAPTURE process already running
ELSE RAISE;
END IF;
END;
/
