Sunday, April 6, 2014

Analysing Parallel Execution Skew - Without Diagnostics / Tuning Pack License

This is the third part of the video tutorial "Analysing Parallel Execution Skew". In this part I show how to analyse a parallel SQL execution regarding Parallel Execution Skew.

If you don't have a Diagnostics / Tuning Pack license the options you have for doing that are quite limited, and the approach, as demonstrated in the tutorial, has several limitations and shortcomings.

Here is the video:



If you want to reproduce or play around with the examples shown in the tutorial here is the script for creating the tables and running the queries / DML commands used in the tutorial. A shout goes out to Christo Kutrovsky at Pythian who I think was the one who inspired the beautified version on V$PQ_TQSTAT.

---------------------
-- Links for S-ASH --
---------------------
--
-- http://www.perfvision.com/ash.php
-- http://www.pythian.com/blog/trying-out-s-ash/
-- http://sourceforge.net/projects/orasash/files/v2.3/
-- http://sourceforge.net/projects/ashv/
---------------------

-- Table creation
set echo on timing on time on

drop table t_1;

purge table t_1;

drop table t_2;

purge table t_2;

drop table t_1_part;

purge table t_1_part;

drop table t_2_part;

purge table t_2_part;

drop table t1;

purge table t1;

drop table t2;

purge table t2;

drop table t3;

purge table t3;

drop table t4;

purge table t4;

drop table t5;

purge table t5;

drop table x;

purge table x;

create table t1
as
select  /*+ use_nl(a b) */
        (rownum * 2) as id
      , rownum as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000) */ * from dual
connect by
        level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't1')

alter table t1 cache;

create table t2
compress
as
select
        (rownum * 2) + 1 as id
      , mod(rownum, 2000) + 1 as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000000) */ * from dual
connect by
        level <= 1000000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't2')

alter table t2 cache;

create table t3
as
select  /*+ use_nl(a b) */
        (rownum * 2) as id
      , rownum as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000) */ * from dual
connect by
        level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't3')

alter table t3 cache;

create table t4
compress
as
select
        (rownum * 2) + 1 as id
      , mod(rownum, 2000) + 1 as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000000) */ * from dual
connect by
        level <= 1000000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't4')

alter table t4 cache;

create table t5
as
select  /*+ use_nl(a b) */
        (rownum * 2) as id
      , rownum as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000) */ * from dual
connect by
        level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't5')

alter table t5 cache;

create table x
compress
as
select * from t2
where 1 = 2;

create unique index x_idx1 on x (id);

alter table t1 parallel 2;

alter table t2 parallel 2;

alter table t3 parallel 15;

alter table t4 parallel 15;

alter table t5 parallel 15;

create table t_1
compress
as
select  /*+ use_nl(a b) */
        rownum as id
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_1')

create table t_2
compress
as
select
        rownum as id
      , case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_2', method_opt=>'for all columns size 1', no_invalidate=>false)

alter table t_1 parallel 8 cache;

alter table t_2 parallel 8 cache;

create table t_1_part
partition by hash(id) partitions 8
compress
as
select  /*+ use_nl(a b) */
        rownum as id
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_1_part')

create table t_2_part
partition by hash(fk_id_skew) partitions 8
compress
as
select
        rownum as id
      , case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_2_part', method_opt=>'for all columns size 1', no_invalidate=>false)

alter table t_1_part parallel 8 cache;

alter table t_2_part parallel 8 cache;

---------------------------------------------------------------
-- Single DFO tree (with Parallel Execution Skew), many DFOs --
---------------------------------------------------------------

set echo on timing on time on verify on

define num_cpu = "14"

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 200000000;

alter session set sort_area_size = 200000000;

alter session set hash_area_size = 200000000;

alter session set hash_area_size = 200000000;

select
        max(t1_id)
      , max(t1_filler)
      , max(t2_id)
      , max(t2_filler)
      , max(t3_id)
      , max(t3_filler)
from    (
          select  /*+ monitor
                     no_merge
                     no_merge(v_1)
                     no_merge(v_5)
                     parallel(t1 &num_cpu)
                     PQ_DISTRIBUTE(T1 HASH HASH)
                     PQ_DISTRIBUTE(V_5 HASH HASH)
                     leading (v_1 v_5 t1)
                     use_hash(v_1 v_5 t1)
                     swap_join_inputs(t1)
                  */
                  t1.id     as t1_id
                , regexp_replace(v_5.t3_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') as t1_filler
                , v_5.*
          from    (
                    select  /*+ parallel(t2 &num_cpu)
                                parallel(t3 &num_cpu)
                                leading(t3 t2)
                                use_hash(t3 t2)
                                swap_join_inputs(t2)
                                PQ_DISTRIBUTE(T2 HASH HASH)
                            */
                            t2.id     as t2_id
                          , t2.filler as t2_filler
                          , t2.id2    as t2_id2
                          , t3.id     as t3_id
                          , t3.filler as t3_filler
                    from
                            t1 t2
                          , t2 t3
                    where
                            t3.id2 = t2.id2 (+)
                    and     regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
                    and     mod(t3.id2, 3) = 0
                  ) v_1
                , (
                    select  /*+ parallel(t2 &num_cpu)
                                parallel(t3 &num_cpu)
                                leading(t3 t2)
                                use_hash(t3 t2)
                                swap_join_inputs(t2)
                                PQ_DISTRIBUTE(T2 HASH HASH)
                            */
                            t2.id     as t2_id
                          , t2.filler as t2_filler
                          , t2.id2    as t2_id2
                          , t3.id     as t3_id
                          , t3.filler as t3_filler
                    from
                            t1 t2
                          , t2 t3
                    where
                            t3.id = t2.id (+)
                    and     regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
                    and     mod(t3.id2, 3) = 0
                  ) v_5
                , t1
          where
                  v_1.t3_id = v_5.t3_id
          and     v_5.t2_id2 = t1.id2 (+) + 2001
          and     regexp_replace(v_5.t3_filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t1.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
        )
;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

---------------------------------------------------------------------------------------------------
-- Same statement with Parallel TEMP TABLE TRANSFORMATION, V$PQ_TQSTAT shows useless information --
---------------------------------------------------------------------------------------------------

set echo on timing on time on verify on

define num_cpu = "14"

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 200000000;

alter session set sort_area_size = 200000000;

alter session set hash_area_size = 200000000;

alter session set hash_area_size = 200000000;

with result as
(
select /*+ materialize
           monitor
           no_merge
           no_merge(v_1)
           no_merge(v_5)
           parallel(t1 &num_cpu)
           PQ_DISTRIBUTE(T1 HASH HASH)
           PQ_DISTRIBUTE(V_1 HASH HASH)
           PQ_DISTRIBUTE(V_5 HASH HASH)
           leading (v_1 v_5 t1)
           use_hash(v_1 v_5 t1)
           swap_join_inputs(t1)
       */
       t1.id     as t1_id
     , regexp_replace(v_5.t3_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') as t1_filler
     , v_5.*
from   (
  select /*+ parallel(t2 &num_cpu) parallel(t3 &num_cpu) leading(t3 t2) use_hash(t3 t2) swap_join_inputs(t2) PQ_DISTRIBUTE(T2 HASH HASH) */
        t2.id     as t2_id
      , t2.filler as t2_filler
      , t2.id2    as t2_id2
      , t3.id     as t3_id
      , t3.filler as t3_filler
  from
        t1 t2
      , t2 t3
  where
        t3.id2 = t2.id2 (+)
  and   regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
  and   mod(t3.id2, 3) = 0
)
 v_1
     , (
  select /*+ parallel(t2 &num_cpu) parallel(t3 &num_cpu) leading(t3 t2) use_hash(t3 t2) swap_join_inputs(t2) PQ_DISTRIBUTE(T2 HASH HASH) */
        t2.id     as t2_id
      , t2.filler as t2_filler
      , t2.id2    as t2_id2
      , t3.id     as t3_id
      , t3.filler as t3_filler
  from
        t1 t2
      , t2 t3
  where
        t3.id = t2.id (+)
  and   regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
  and   mod(t3.id2, 3) = 0
) v_5
     , t1
where
       v_1.t3_id = v_5.t3_id
and    v_5.t2_id2 = t1.id2 (+) + 2001
and    regexp_replace(v_5.t3_filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t1.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
)
select max(t1_id), max(t1_filler), max(t2_id), max(t2_filler), max(t3_id), max(t3_filler) from
result;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

--------------------------------------------------------------------------------------------------
-- This construct results in misleading information from V$PQ_TQSTAT (actually a complete mess) --
--------------------------------------------------------------------------------------------------

set echo on timing on time on

alter session enable parallel dml;

truncate table x;

insert  /*+ append parallel(x 4) */ into x
select  /*+ leading(v1 v2) optimizer_features_enable('11.2.0.1') */
        v_1.id
      , v_1.id2
      , v_1.filler
from    (
           select
                   id
                 , id2
                 , filler
           from    (
                      select  /*+ parallel(t2 4) no_merge */
                              rownum as id
                            , t2.id2
                            , t2.filler
                      from
                              t2
                      where
                              mod(t2.id2, 3) = 0
                      and     regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
                   ) v1
        ) v_1
      , (
          select
                   id
                 , id2
                 , filler
          from     (
                     select  /*+ parallel(t2 8) no_merge */
                             rownum as id
                           , t2.id2
                           , t2.filler
                     from
                             t2
                     where
                             mod(t2.id2, 3) = 0
                     and     regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
                   ) v2
        ) v_2
where
        v_1.id = v_2.id
and     v_1.filler = v_2.filler
;

-- Parallel DML requires a COMMIT before querying V$PQ_TQSTAT
commit;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

compute sum label Total of num_rows on server_type

select
        dfo_number
      , tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

----------------------------------------------------------------------
-- Single DFO tree (with Parallel Execution Skew, almost no impact) --
----------------------------------------------------------------------

set echo on timing on time on

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 500000000;

alter session set sort_area_size = 500000000;

alter session set hash_area_size = 500000000;

alter session set hash_area_size = 500000000;

select  /*+ leading(v1)
            use_hash(t_1)
            no_swap_join_inputs(t_1)
            pq_distribute(t_1 hash hash)
        */
        max(t_1.filler)
      , max(v1.t_1_filler)
      , max(v1.t_2_filler)
from
        t_1
      , (
          select  /*+ no_merge
                      leading(t_1 t_2)
                      use_hash(t_2)
                      no_swap_join_inputs(t_2)
                      pq_distribute(t_2 hash hash) */
                  t_1.id as t_1_id
                , t_1.filler as t_1_filler
                , t_2.id as t_2_id
                , t_2.filler as t_2_filler
          from    t_1
                , t_2
          where
                  t_2.fk_id_skew = t_1.id
        ) v1
where
        v1.t_2_id = t_1.id
and     regexp_replace(v1.t_2_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
and     regexp_replace(v1.t_2_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

--------------------------------------------------------------------------------------------------------------------------------
-- Full Partition Wise Join with partition skew - V$PQ_TQSTAT is of no help, since no redistribution takes place (single DFO) --
--------------------------------------------------------------------------------------------------------------------------------

set echo on timing on time on

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 500000000;

alter session set sort_area_size = 500000000;

alter session set hash_area_size = 500000000;

alter session set hash_area_size = 500000000;

select count(t_2_filler) from (
select  /*+ monitor
            leading(t_1 t_2)
            use_hash(t_2)
            no_swap_join_inputs(t_2)
            pq_distribute(t_2 none none)
        */
        t_1.id as t_1_id
      , t_1.filler as t_1_filler
      , t_2.id as t_2_id
      , t_2.filler as t_2_filler
from    t_1_part t_1
      , t_2_part t_2
where
        t_2.fk_id_skew = t_1.id
and     regexp_replace(t_2.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
);

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;