Потоки данных в Oracle - Формирование потоков

ОГЛАВЛЕНИЕ

Формирование потоков

Создадим очередь для передачи событий в БД-источнике и очередь для применения событий в БД-получателе, например:

EXECUTE DBMS_STREAMS_ADM.SET_UP_QUEUE ( )
CONNECT streamadmin/streamadmin@destination
EXECUTE DBMS_STREAMS_ADM.SET_UP_QUEUE ( )

Коли указано специально, очереди в обеих БД (и таблицы для данных этих очередей) получили умолчательные названия. Их можно наблюдать так:

SQL> CONNECT streamadmin/streamadmin@source
Connected.

SQL> SELECT name, queue_table FROM user_queues;

NAME                           QUEUE_TABLE
------------------------------ ------------------------------
STREAMS_QUEUE                  STREAMS_QUEUE_TABLE
AQ$_STREAMS_QUEUE_TABLE_E      STREAMS_QUEUE_TABLE

Очередь AQ$_*_E создается автоматически для сообщений об ошибках обработки событий.

Для возможности передавать потоком изменения в исходной таблице SCOTT.EMP требуется заявить расширенную журнализацию хотя бы для этой таблицы:

CONNECT scott/tiger@source
ALTER TABLE emp ADD SUPPLEMENTAL LOG DATA ( PRIMARY KEY ) COLUMNS;

Проверка:

SQL> SELECT always, table_name, log_group_type FROM user_log_groups;

ALWAYS      TABLE_NAME                     LOG_GROUP_TYPE
----------- ------------------------------ -------------------
ALWAYS      EMP                            PRIMARY KEY LOGGING

Теперь правка любого поля в таблице EMP будет сопровождаться (безусловно) занесением в журнал не только старого и нового значений этого поля, но также и значения ключевого поля (то есть EMPNO).

В БД-источнике создадим процесс захвата изменений, одновременно указав правила отбора изменений в очередь:

CONNECT streamadmin/streamadmin@source BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES (
  table_name     => 'scott.emp'
, streams_type   => 'capture'
, streams_name   => 'capture_stream'
, include_ddl    => TRUE
);
END;
/

Проверка:

SQL> SELECT capture_name, queue_name, queue_owner, status
  2  FROM all_capture;

CAPTURE_NAME       QUEUE_NAME         QUEUE_OWNER        STATUS
------------------ ------------------ ------------------ --------
CAPTURE_STREAM     STREAMS_QUEUE      STREAMADMIN        DISABLED

Среди прочих умолчаний при создании процесса захвата изменений выше использовано подразумеваемое молчаливо имя очереди STREAMS_QUEUE. В нашем случае это можно было бы обозначить явно, указав параметр QUEUE_NAME => 'streamadmin.streams_queue'. Этим же параметром можно воспользоваться, когда процесс захвата потребуется связать с очередью под иным именем.

Правила отбора изменений в очередь STREAMS_QUEUE также были построены автоматически, но могли бы быть дополнены, или даже выписаны явно с помощью других параметров процедуры ADD_TABLE_RULES.

Создадим процесс переноса изменений:

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES (
  table_name            => 'scott.emp'
, streams_name       => 'maindb_to_subdb1'
, source_queue_name  => 'streamadmin.streams_queue'
, destination_queue_name
                        => 'Адрес электронной почты защищен от спам-ботов. Для просмотра адреса в вашем браузере должен быть включен Javascript.'
, source_database    => 'maindb.class'
, include_ddl        => TRUE
);
END;
/

Проверка:

SQL> SELECT propagation_name, source_queue_name,
  2         destination_queue_name, status
  3  FROM dba_propagation;

PROPAGATION_NAME SOURCE_QUEUE_NAME DESTINATION_QUEUE_NAM STATUS
---------------- ----------------- --------------------- -------
MAINDB_TO_SUBDB1 STREAMS_QUEUE     STREAMS_QUEUE         ENABLED

Теперь для правильного воспроизведения изменений в принимающей БД требуется передать ей в качестве "точки отсчета" номер изменений в БД-источнике. Передаваться получателям будут только изменения в EMP с номерами более поздними:

BEGIN
Адрес электронной почты защищен от спам-ботов. Для просмотра адреса в вашем браузере должен быть включен Javascript. (
  source_object_name   => 'scott.emp'
, source_database_name => 'maindb.class'
, instantiation_scn    => DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER
);
END;
/

Убедиться в учете процессом применения для таблиц точки отсчета можно запросом:

SQL> COLUMN source_database FORMAT A20
SQL> SELECT
  2    source_object_name, source_object_type, instantiation_scn
  3  FROM  Адрес электронной почты защищен от спам-ботов. Для просмотра адреса в вашем браузере должен быть включен Javascript.;

SOURCE_OBJECT_NAME SOURCE_OBJE INSTANTIATION_SCN
------------------------------ ----------- -----------------
EMP                            TABLE                 1200698

Принимающая БД готова к активации процесса применения изменений:

CONNECT streamadmin/streamadmin@destination  BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES (
  table_name      => 'scott.emp'
, streams_type    => 'apply'
, streams_name    => 'apply_stream'
, source_database => 'maindb.class'
, include_ddl     => TRUE
);
END;
/

Проверка:

SQL> SELECT apply_name, queue_name, status FROM all_apply;APPLY_NAME               QUEUE_NAME               STATUS
------------------------ ------------------------ --------
APPLY_STREAM             STREAMS_QUEUE            DISABLED

Для удобства отключим реакцию на ошибки, иначе процесс применения изменений может самопроизвольно прекращаться:

BEGIN
DBMS_APPLY_ADM.SET_PARAMETER (
  apply_name  => 'apply_stream'
, parameter   => 'disable_on_error'
, value       => 'N'
);
END;
/

Осталось запустить процессы захвата и примения изменений:

CONNECT streamadmin/streamadmin@source

EXECUTE DBMS_CAPTURE_ADM.START_CAPTURE ( 'capture_stream' )

EXECUTE -
Адрес электронной почты защищен от спам-ботов. Для просмотра адреса в вашем браузере должен быть включен Javascript. ( 'apply_stream' )

Проверка:

SQL> CONNECT streamadmin/streamadmin@source
Connected.
SQL> SELECT empno FROM scott.emp MINUS
  2  SELECT empno FROM Адрес электронной почты защищен от спам-ботов. Для просмотра адреса в вашем браузере должен быть включен Javascript.
  3  .
SQL> SAVE delta REPLACE
Wrote file delta.sql
SQL> @delta

no rows selected

SQL> INSERT INTO scott.emp ( empno ) VALUES ( 3333 );

1 row created.

SQL> @delta

     EMPNO
----------
      3333

SQL> COMMIT;

Commit complete.

SQL> @delta

no rows selected

Заметьте, что поток переносит изменения только в одну сторону. Таблица-приемник при этом не закрыта от обычной правки. Однако же такую правку следует выполнять осмотрительно, поскольку она может привести к ошибкам при автоматическом изменении данных потоком (эта проблема решается специально седствами разрешении конфликтов). Вдобавок учтите, что множественные операции INSERT, UPDATE, DELETE применяются в принимающей БД в рамках одной (автономной) транзакции (невзирая на то, что в журнале БД множественные изменения фиксируются набором однострочных изменений). Следовательно ошибка хотя бы в изменении одной-единственной строки приведет к отказу изменений всей множественной операции.