home *** CD-ROM | disk | FTP | other *** search
/ Winzipper / Winzipper_ISO.iso / programming / oracle7 7.2 / DB / UTIL72 / PRVTALRT.SQL < prev    next >
Encoding:
Text File  |  1995-05-09  |  17.0 KB  |  506 lines

  1. rem 
  2. rem $Header: prvtalrt.sql 7020100.1 94/09/23 22:13:16 cli Generic<base> $ 
  3. rem 
  4. Rem
  5. Rem    NAME
  6. Rem     prvtalrt.sql - Blocking implementation of DBMS "alerts"
  7. Rem    DESCRIPTION
  8. Rem     These are private functions to be released in PL/SQL binary form.
  9. Rem     Routines to wait-for, and signal, a named event.  The waiting
  10. Rem     session will block in the database until the event occurs, or until
  11. Rem     a timeout expires.  The implementation avoids polling except when
  12. Rem     running in parallel server mode.
  13. Rem    RETURNS
  14. Rem 
  15. Rem    NOTES
  16. Rem      The procedural option is needed to use this facility.
  17. Rem      The package body is private.  The public package specification is 
  18. Rem      in dbmsalrt.sql.
  19. Rem
  20. Rem    MODIFIED   (MM/DD/YY)
  21. Rem     adowning   03/29/94 -  merge changes from branch 1.1.710.1
  22. Rem     adowning   02/10/94 -  Branch_for_patch
  23. Rem     adowning   02/10/94 -  Creation
  24. Rem     adowning   02/10/94 -  Creation
  25. Rem     mmoore     03/17/93 -  merge changes from branch 1.6.312.1 
  26. Rem     mmoore     03/11/92 - #(153818) fix looping in signal upon cleanup
  27. Rem     rkooi      12/03/92 - #141803, improve some comments 
  28. Rem     rkooi      11/25/92 -  allow signalling and waiting in same session 
  29. Rem     rkooi      11/17/92 -  pipe cleanup bug 
  30. Rem     rkooi      11/12/92 -  don't call removeall from signal
  31. Rem     rkooi      08/12/92 -  surface removeall function 
  32. Rem     rkooi      06/05/92 -  Creation 
  33. REM 
  34.  
  35. drop table dbms_alert_info
  36. /
  37. create table dbms_alert_info
  38. (
  39.     name     varchar2(30),
  40.     sid      varchar2(30),
  41.     changed  varchar2(1),
  42.     message  varchar2(1800),
  43.     primary key (name, sid)
  44. )
  45. /
  46.  
  47. create or replace package body dbms_alert is
  48.   p_int           number         := 5;  -- poll once every 5 seconds iff
  49.                                         -- it is needed.
  50.   this_session_id   varchar2(30)   := dbms_session.unique_session_id;
  51.   parallel          boolean        := dbms_utility.is_parallel_server;
  52.   sigpipe           varchar2(30)   := 'ORA$ALERT$' || this_session_id;
  53.   msgseq            binary_integer := 0;
  54.   firstregister     boolean        := TRUE;
  55.   instantiating_pkg boolean        := TRUE;
  56.  
  57.   function minimum(v1 number, v2 number) return number is
  58.   begin
  59.     if v1 < v2 then
  60.       return v1;
  61.     else
  62.       return v2;
  63.     end if;
  64.   end;
  65.  
  66.   function alert_hash(name varchar2) return integer is
  67.     hashval binary_integer := 0;
  68.     strlen  binary_integer := lengthb(name);
  69.   begin
  70.     for i in 1..strlen loop
  71.       hashval := hashval + ascii(substrb(name,i,1));
  72.     end loop;
  73.     return 2000000000 + (hashval mod 1021);
  74.   end;
  75.  
  76.   function session_hash(name varchar2) return integer is
  77.     hashval binary_integer := 0;
  78.     strlen  binary_integer := lengthb(name);
  79.   begin
  80.     for i in 1..strlen loop
  81.       hashval := hashval + ascii(substrb(name,i,1));
  82.     end loop;
  83.     return 2000001021 + (hashval mod 1021);
  84.   end;
  85.  
  86.   procedure set_defaults(sensitivity in number) is
  87.   begin
  88.     if sensitivity >= 0 then
  89.       p_int := sensitivity;
  90.     end if;
  91.   end;
  92.  
  93.   procedure register(name in varchar2) is
  94.     status  integer;
  95.     lstatus integer;
  96.     lockid  integer;
  97.     cursor  c1 is
  98.               select distinct substr(kglnaobj,11) sid from x$kglob
  99.               where kglhdnsp = 7
  100.               and   kglnaobj like 'ORA$ALERT$%'
  101.               and   bitand(kglhdflg,128)!=0
  102.               union
  103.               select distinct sid from dbms_alert_info;
  104.   begin
  105.     if instantiating_pkg then
  106.       removeall;
  107.       instantiating_pkg := FALSE;
  108.     end if;
  109.  
  110.     if (firstregister) then
  111.       -- See if there are any orphaned pipes that should be cleaned up
  112.       for rec in c1 loop
  113.         -- see if the session is alive
  114.         lockid := session_hash(rec.sid);
  115.         lstatus := dbms_lock.request(lockid, dbms_lock.x_mode, 
  116.           timeout => 0, release_on_commit => TRUE);
  117.         if lstatus = 0 then
  118.           -- session must be dead so cleanup
  119.           dbms_pipe.purge('ORA$ALERT$' || rec.sid);
  120.           delete dbms_alert_info where sid = rec.sid;
  121.           commit;
  122.         elsif lstatus not in (1,2,4) then -- timeout, deadlock, already own
  123.           raise_application_error(-20000,
  124.             'ORU-10025: lock request error, status: ' || to_char(lstatus));
  125.         end if;
  126.       end loop;
  127.  
  128.       -- get lock to indicate that this session is alive.  Status 4 can occur
  129.       -- if this package gets reinstantiated and we already have this lock.
  130.       -- Use s_mode in case there is a hash collision between two session ids.
  131.       -- Use timeout of 60 in case someone is cleaning up this session id.
  132.       lstatus := dbms_lock.request(session_hash(this_session_id),
  133.         dbms_lock.s_mode, timeout => 60);
  134.       if lstatus != 0  and lstatus != 4 then
  135.         raise_application_error(-20000,
  136.           'ORU-10021: lock request error, status: ' || to_char(lstatus));
  137.       end if;
  138.       firstregister := FALSE;
  139.     end if;
  140.  
  141.     -- Make sure user A does not register for this alert between the time 
  142.     -- user B signals the alert and the time user B commits.  Otherwise the
  143.     -- following sequence can occur: B signals (updates dbms_alert_info), A
  144.     -- registers (adds new entry to dbms_alert_info that B will not have
  145.     -- updated), A reads the data covered by the alert, B commits (causing
  146.     -- new data to be written), A does a wait.  A will not see B's signal.
  147.     status := dbms_lock.request(alert_hash(upper(name)), dbms_lock.x_mode,
  148.       dbms_lock.maxwait, release_on_commit => TRUE);
  149.     if status != 0 then
  150.       raise_application_error(-20000,
  151.         'ORU-10002: lock request error, status: ' || to_char(status));
  152.     end if;
  153.  
  154.     insert into dbms_alert_info values (upper(register.name), this_session_id,
  155.       'N', NULL);
  156.     commit;
  157.     
  158.   exception 
  159.     when dup_val_on_index then commit; -- commit to release the lock
  160.   end;
  161.  
  162.   procedure remove(name in varchar2) is
  163.   begin
  164.     if instantiating_pkg then
  165.       removeall;
  166.       instantiating_pkg := FALSE;
  167.     end if;
  168.  
  169.     delete from dbms_alert_info
  170.     where name  = upper(remove.name)
  171.       and sid   = this_session_id;
  172.     commit;
  173.   end;
  174.  
  175.   procedure pipe_wait(maxtime number, cumtime in out number) is
  176.     status integer;
  177.     tmo    number := maxtime;
  178.   begin
  179.     -- the time to wait is:
  180.     --    if running in parallel mode then we must effectively poll since pipes
  181.     --      do not work across instances yet.  This will be fixed when
  182.     --      pipes are upgraded to work in parallel mode.  So wait for 'p_int'.
  183.     --    if not parallel mode then don't
  184.     -- if parallel mode then do polling loop since pipes do not
  185.     -- yet work parallel mode
  186.     if parallel then
  187.       tmo := minimum(tmo, p_int);
  188.     end if;
  189.     if tmo = maxwait then
  190.       tmo := dbms_pipe.maxwait; -- map to dbms_pipe's idea of maxwait
  191.     end if;
  192.  
  193.     status := dbms_pipe.receive_message(sigpipe, tmo);
  194.     if status = 1 then
  195.       cumtime := cumtime + tmo;
  196.       return;
  197.     end if;
  198.     if status <> 0 then
  199.       raise_application_error(-20000, 'ORU-10015: error:' || to_char(status)
  200.         || ' waiting for pipe message.');
  201.     end if;
  202.     return;
  203.   end;
  204.  
  205.   -- optimistic pass for waitany.  keeps from waiting on a pending 
  206.   -- transaction if there exists some other, committed, alert.
  207.   procedure optimistic(
  208.     name    out varchar2, 
  209.     message out varchar2,
  210.     status  out integer)
  211.   is
  212.     lockid  integer;
  213.     lstatus integer;
  214.     cursor  c1 is 
  215.               select name from dbms_alert_info
  216.               where sid = this_session_id
  217.               and   changed = 'Y';
  218.   begin
  219.     status := 1;
  220.     for rec in c1 loop
  221.       lockid := alert_hash(rec.name);
  222.       lstatus := dbms_lock.request(lockid, dbms_lock.sx_mode, timeout => 0,
  223.         release_on_commit => TRUE);
  224.       if lstatus <> 1 then
  225.         if lstatus <> 0 then
  226.           raise_application_error(-20000, 'ORU-10019: error ' || 
  227.             to_char(lstatus) || ' on lock request.');
  228.         end if;
  229.         update dbms_alert_info set changed = 'N'
  230.           where sid = this_session_id
  231.           and   name = rec.name;
  232.         select message into message from dbms_alert_info
  233.           where sid = this_session_id
  234.           and   name = rec.name;
  235.         commit;
  236.         dbms_pipe.purge(sigpipe); -- just to avoid unnecessary work next time
  237.         name := rec.name;
  238.         status := 0;
  239.         return;
  240.       end if;
  241.     end loop;
  242.     return;
  243.   end;
  244.  
  245.   procedure waitany(
  246.     name    out varchar2, 
  247.     message out varchar2, 
  248.     status  out integer,
  249.     timeout in  number    default maxwait)
  250.   is
  251.     waitime  number        := 0;
  252.     cumtime  number        := 0;
  253.     lockid   integer;
  254.     st       integer;
  255.     lstatus  integer;
  256.     timedout boolean;
  257.     changed  varchar2(1);
  258.     foundone boolean;
  259.     cursor   c1 is
  260.                select name from dbms_alert_info
  261.                where sid = this_session_id;
  262.   begin
  263.     if instantiating_pkg then
  264.       removeall;
  265.       instantiating_pkg := FALSE;
  266.     end if;
  267.  
  268.     optimistic(name, message, st);
  269.     if st = 0 then
  270.       status := st;
  271.       return;
  272.     end if;
  273.     waitime := 1;
  274.     cumtime := 0;
  275.     loop
  276.       timedout := FALSE;
  277.       foundone := FALSE;
  278.       for rec in c1 loop
  279.         foundone := TRUE;
  280.         lockid := alert_hash(rec.name);
  281.         lstatus := dbms_lock.request(lockid, dbms_lock.sx_mode, waitime,
  282.           release_on_commit => TRUE);
  283.         if lstatus = 1 then  -- timed out
  284.           optimistic(name, message, st); -- see if anyone else committed
  285.                                          -- in the meantime...
  286.           if st = 0 then -- someone *did* commit, so alert happened...
  287.             status := 0;
  288.             return;
  289.           end if;
  290.           cumtime := cumtime + waitime;
  291.           if cumtime >= timeout then
  292.             status := 1;   -- exceeded caller-specified timeout
  293.             return;
  294.           end if;
  295.           timedout := TRUE;
  296.           goto continue;
  297.         elsif lstatus <> 0 then
  298.           raise_application_error(-20000,
  299.             'ORU-10020: error ' || to_char(lstatus) || ' on lock request.');
  300.         else
  301.           -- now that we have the row covered by a lock we can select
  302.           -- the changed and message columns from it.
  303.           select changed, message into changed, message from dbms_alert_info
  304.             where sid = this_session_id
  305.             and   name = rec.name;
  306.           if changed = 'Y' then  -- alert occurred
  307.             update dbms_alert_info set changed = 'N'
  308.               where sid = this_session_id
  309.               and   name = rec.name;
  310.             commit;
  311.  
  312.             name := rec.name;
  313.             status := 0;
  314.             dbms_pipe.purge(sigpipe);
  315.             return;
  316.           end if;
  317.           lstatus := dbms_lock.release(lockid);
  318.         end if;
  319.         <<continue>>
  320.         null; -- there is no 'continue' stmt in pl/sql
  321.       end loop;
  322.       if not foundone then
  323.         raise_application_error(-20000,
  324.           'ORU-10024: there are no alerts registered.');
  325.       end if;
  326.  
  327.       if timedout then
  328.         waitime := minimum(waitime*2, 32); -- do exponential backoff, max at 32
  329.         waitime := minimum(waitime, timeout-cumtime);
  330.       else
  331.         -- nothing to wait on so wait on pipe
  332.         pipe_wait(timeout-cumtime, cumtime);
  333.       end if;
  334.       if cumtime >= timeout then
  335.         status := 1;
  336.         return;
  337.       end if;
  338.  
  339.     end loop;
  340.   end;
  341.  
  342.   procedure waitone(
  343.     name    in  varchar2, 
  344.     message out varchar2, 
  345.     status  out integer,
  346.     timeout in  number    default maxwait)
  347.   is
  348.     cumtime number  := 0;
  349.     lockid  integer := alert_hash(upper(name));
  350.     lstatus integer;
  351.   begin
  352.     if instantiating_pkg then
  353.       removeall;
  354.       instantiating_pkg := FALSE;
  355.     end if;
  356.  
  357.     loop
  358.       lstatus := dbms_lock.request(lockid, dbms_lock.sx_mode, timeout-cumtime,
  359.         release_on_commit => TRUE);
  360.       if lstatus = 1 then
  361.         status := 1;
  362.         return;
  363.       end if;
  364.       if lstatus = 4 then
  365.         raise_application_error(-20000,
  366.       'ORU-10037: attempting to wait on uncommitted signal from same session');
  367.       end if;
  368.       if lstatus <> 0 then
  369.         raise_application_error(-20000,
  370.           'ORU-10023: error ' || to_char(lstatus) || ' on lock request.');
  371.       end if;
  372.       update dbms_alert_info set changed = 'N'
  373.       where name    = upper(waitone.name)
  374.         and sid     = this_session_id
  375.         and changed = 'Y';
  376.       if sql%rowcount != 0 then
  377.         select message into message from dbms_alert_info
  378.         where name    = upper(waitone.name)
  379.           and sid     = this_session_id;
  380.         commit;
  381.  
  382.         dbms_pipe.purge(sigpipe); -- discard unneeded msgs
  383.         status := 0;
  384.         return;
  385.       end if;
  386.       lstatus := dbms_lock.release(lockid);
  387.  
  388.       -- wait for timeout, or until a message arrives on the pipe.  If
  389.       -- parallel mode then don't wait longer than p_int.
  390.       pipe_wait(timeout, cumtime);
  391.       if cumtime >= timeout then
  392.         status := 1;
  393.         return;
  394.       end if;
  395.     end loop;
  396.   end;
  397.  
  398.   procedure signal_pipe(pipename varchar2) is
  399.     msgid    varchar2(40);
  400.     tmpmsgid varchar2(40);
  401.     status   integer;
  402.   begin
  403.     msgid := this_session_id || ':' || to_char(msgseq);
  404.     msgseq := msgseq + 1;
  405.     dbms_pipe.pack_message(msgid);
  406.     status := dbms_pipe.send_message(pipename);
  407.     if status <> 0 then
  408.       raise_application_error(-20000,
  409.         'ORU-10016: error:' || to_char(status) || ' sending on pipe ' || 
  410.           pipename);
  411.     end if;
  412.  
  413.     -- remove dup signals from the pipe
  414.     status := dbms_pipe.receive_message(pipename, 0);
  415.     if status = 1 then
  416.       -- receiver has already taken signal off of pipe (or pipe is busy)
  417.       return;
  418.     end if;
  419.     if status <> 0 then
  420.       raise_application_error(-20000, 
  421.         'ORU-10017: error:' || to_char(status) || ' receiving on pipe ' || 
  422.           pipename);
  423.     end if;
  424.     dbms_pipe.unpack_message(tmpmsgid);
  425.     if tmpmsgid = msgid then
  426.       -- it was our message so put it back on
  427.       dbms_pipe.pack_message(msgid);
  428.       status := dbms_pipe.send_message(pipename);
  429.       if status <> 0 then
  430.         raise_application_error(-20000, 
  431.           'ORU-10018: error:' || to_char(status) || ' sending on pipe ' || 
  432.             pipename);
  433.       end if;
  434.     end if;
  435.   end;
  436.  
  437.   procedure signal(name in varchar2, message in varchar2) is
  438.     status  integer;
  439.     cursor  c2(alertname varchar2) is
  440.               select sid from dbms_alert_info
  441.               where name = upper(alertname);
  442.   begin
  443.     status := dbms_lock.request(alert_hash(upper(name)), dbms_lock.s_mode, 
  444.       dbms_lock.maxwait, release_on_commit => TRUE);
  445.     -- status 4 means we already own this lock which happens if this alert
  446.     -- is signalled more than once during this transaction, or if we are
  447.     -- signalling multiple different alerts during this transaction and
  448.     -- there is a hash collision on the alert name.
  449.     if status != 0  and status != 4 then
  450.       raise_application_error(-20000,
  451.         'ORU-10001: lock request error, status: ' || to_char(status));
  452.     end if;
  453.  
  454.     -- We cannot add the clause "and changed = 'N'" since we need to guarantee
  455.     -- that at commit time changed is 'Y'.  If we add the clause then a
  456.     -- waiter can reset it to 'N' prior to our commit.
  457.     update dbms_alert_info set changed = 'Y', message = signal.message
  458.     where name = upper(signal.name);
  459.  
  460.     -- signal all interested sessions that "something has happened".
  461.     -- The sessions need to wakeup and then check dbms_alert_info
  462.     -- to find out what, if anything, did happen since this transaction
  463.     -- could rollback after the message is sent.
  464.     for rec in c2(name) loop
  465.       -- make sure requesting session is still alive.  cleanup if not.
  466.       status := dbms_lock.request(session_hash(rec.sid), dbms_lock.sx_mode, 
  467.         timeout => 0, release_on_commit => TRUE);
  468.       if status = 0 then
  469.         -- ooops, we should not have been able to aquire this lock.  
  470.         -- The session must be dead.  cleanup
  471.     
  472.         -- don't delete from dbms_alert_info here cause could cause 
  473.         -- deadlocks.  Just wait for next register call to get
  474.         -- dbms_alert_info cleaned up.
  475.         dbms_pipe.purge('ORA$ALERT$' || rec.sid);
  476.         status := dbms_lock.release(session_hash(rec.sid));
  477.       else
  478.         -- 1 is timeout, 4 is we already own this lock.  4 can happen if
  479.         -- this session is registered for this alert.
  480.         if status != 1 and status != 4 then
  481.           raise_application_error(-20000,
  482.             'ORU-10022: lock request error, status: ' || to_char(status));
  483.         end if;
  484.  
  485.         -- signal on pipe even if parallel mode since the waiter might
  486.         -- be on this instance. 
  487.         signal_pipe('ORA$ALERT$' || rec.sid);
  488.       end if;
  489.  
  490.     end loop;
  491.   end;
  492.  
  493.   procedure removeall is
  494.   begin
  495.     delete from dbms_alert_info where sid = this_session_id;
  496.     dbms_pipe.purge(sigpipe);
  497.     commit;
  498.   end;
  499.  
  500. end;
  501. /
  502. drop public synonym dbms_alert
  503. /
  504. create public synonym dbms_alert for dbms_alert
  505. /
  506.