代码语言
.
CSharp
.
JS
Java
Asp.Net
C
MSSQL
PHP
Css
PLSQL
Python
Shell
EBS
ASP
Perl
ObjC
VB.Net
VBS
MYSQL
GO
Delphi
AS
DB2
Domino
Rails
ActionScript
Scala
代码分类
文件
系统
字符串
数据库
网络相关
图形/GUI
多媒体
算法
游戏
Jquery
Extjs
Android
HTML5
菜单
网页交互
WinForm
控件
企业应用
安全与加密
脚本/批处理
开放平台
其它
【
PLSQL
】
ETL调度
作者:
ynihui
/ 发布于
2013/11/21
/
1494
ETL调度
CREATE OR REPLACE PACKAGE PKG_ETL_CTL IS -- 调用主程序,调用存储过程 PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER); -- 创建新的时间周期,运行数据周期作业及作业流状态处理 PROCEDURE SP_FLOW_RUN_DEAL; -- 创建新的时间周期 PROCEDURE SP_FLOW_CREATE_NEW_PERIOD; -- 运行数据周期内的作业流及作业 PROCEDURE SP_FLOW_RUN_NEW_PERIOD; -- 同步作业流运行状态 PROCEDURE SP_FLOW_RUN_STATUS; -- 失败作业流及作业重置为未处理 PROCEDURE SP_FLOW_RUN_ERROR_RESET; -- 作业状态更新 PROCEDURE SP_JOB_RUN_STATUS ( I_JOB_ID NUMBER ,I_ORG_ID VARCHAR2 ,I_JOB_RUN_STATUS VARCHAR2 ,I_JOB_RUN_INFO VARCHAR2 ); -- 作业流日志处理 PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER); -- 作业日志处理 PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER); PROCEDURE SP_INSERT_MONITOR_SMS ( O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/ ,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/ ); PROCEDURE SP_SEND_MONITOR_SMS; -- 获取作业流前置依赖 FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2; -- 获取作业前置依赖 FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2; -- 获取作业流下属作业运行状态 FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2; -- 获取下个数据日期 FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE; -- 获取上级作业流运行状态 FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2; -- 获取上级作业流数据开始时间 FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE; -- 获取上级作业流数据结束时间 FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE; -- 获取周期代码 FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2; END PKG_ETL_CTL; / CREATE OR REPLACE PACKAGE BODY PKG_ETL_CTL IS /******************************************************************* 程序名 :SP_EXEC_PROC 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 调用主程序,调用存储过程 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER) IS VAR_PRO_NAME VARCHAR2(100); VAR_DATA_START_TIME VARCHAR2(20); VAR_DATA_END_TIME VARCHAR2(20); VAR_SQL VARCHAR2(4000); VAR_PARAMS VARCHAR2(1000); VAR_ORG_ID VARCHAR2(10); VAR_JOB_RUN_DESC VARCHAR2(100); VAR_JOB_ERR_DESC VARCHAR2(100); BEGIN -- 获取作业正在运行描述 SELECT T.ETL_PARA_VAL INTO VAR_JOB_RUN_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_RUN_DESC'; -- 获取作业运行失败描述 SELECT T.ETL_PARA_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_ERR_DESC'; -- 获取作业所调用的存储过程,数据开始时间,数据结束时间 SELECT T.ETL_JOB_PROC ,TO_CHAR(A.ETL_DATA_START_TIME, 'YYYYMMDDHH24MISS') ,TO_CHAR(A.ETL_DATA_END_TIME, 'YYYYMMDDHH24MISS') INTO VAR_PRO_NAME ,VAR_DATA_START_TIME ,VAR_DATA_END_TIME FROM ETL_CTL_JOB_INFO T ,ETL_JOB_RUN_STS A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_JOB_ID = I_JOB_ID; -- 获取作业全部参数拼接到一起 FOR LOOP_PARAM IN (SELECT T.ETL_PARA_NAME ,DECODE(T.ETL_PARA_TYPE ,2 ,'TO_DATE(' || T.ETL_PARA_VAL || ',''YYYYMMDDHH24MISS'')' -- 日期类型参数转化成日期格式 ,T.ETL_PARA_VAL) ETL_PARA_VAL ,T.ETL_PARA_TYPE FROM ETL_JOB_PARA T WHERE T.ETL_JOB_ID = I_JOB_ID) LOOP -- 获取机构号 IF LOOP_PARAM.ETL_PARA_NAME = 'I_ORG_ID' THEN VAR_ORG_ID := LOOP_PARAM.ETL_PARA_VAL; END IF; -- 参数拼接 VAR_PARAMS := VAR_PARAMS || LOOP_PARAM.ETL_PARA_NAME || ' => ' || LOOP_PARAM.ETL_PARA_VAL || ','; END LOOP; -- 参数加上输出参数(存储过程运行结果和运行信息) VAR_PARAMS := UPPER(VAR_PARAMS) || 'O_RESULT_FLAG => LO_RESULT_FLAG,O_RESULT_MSG => LO_RESULT_MSG'; -- 参数替换为变量 VAR_PARAMS := REPLACE(VAR_PARAMS, '#$I_DATA_START_TIME#', VAR_DATA_START_TIME); VAR_PARAMS := REPLACE(VAR_PARAMS, '#$I_DATA_END_TIME#', VAR_DATA_END_TIME); -- 拼接存储过程进行调用 VAR_SQL := 'DECLARE LO_RESULT_FLAG VARCHAR2(10);LO_RESULT_MSG VARCHAR2(300);BEGIN ' || VAR_PRO_NAME || '(' || VAR_PARAMS || ');PKG_ETL_CTL.SP_JOB_RUN_STATUS(' || I_JOB_ID || ',''' || VAR_ORG_ID || ''', LO_RESULT_FLAG, LO_RESULT_MSG);END;'; -- 修改作业状态为正在运行 SP_JOB_RUN_STATUS(I_JOB_ID, VAR_ORG_ID, '1', VAR_JOB_RUN_DESC); -- 运行存储过程 EXECUTE IMMEDIATE VAR_SQL; EXCEPTION WHEN OTHERS THEN -- 运行出错,返回错误信息 SP_JOB_RUN_STATUS(I_JOB_ID, VAR_ORG_ID, '2', SQLERRM); END; /******************************************************************* 程序名 :SP_FLOW_RUN_DEAL 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 创建新的时间周期,运行数据周期作业及作业流状态处理 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_DEAL IS BEGIN -- 创建新的时间周期 SP_FLOW_CREATE_NEW_PERIOD; -- 运行时间周期内的作业 SP_FLOW_RUN_NEW_PERIOD; -- 更新作业流状态 SP_FLOW_RUN_STATUS; -- 失败任务重置 SP_FLOW_RUN_ERROR_RESET; END; /******************************************************************* 程序名 :SP_FLOW_CREATE_NEW_PERIOD 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 生成新的数据周期运行新周期的数据 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_CREATE_NEW_PERIOD IS DTE_DATA_NEXT_TIME DATE; DTE_DATA_START_TIME DATE; DTE_DATA_END_TIME DATE; BEGIN FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID ,A.ETL_NEXT_EXPIRY_TIME FROM ETL_CTL_JOB_FLOW T ,ETL_FLOW_RUN_STS A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_LEVEL = 1 AND T.ETL_FLOW_STATUS = 1) LOOP DTE_DATA_NEXT_TIME := FN_GET_NEXT_DATA_TIME(LOOP_FLOW.ETL_FLOW_ID); IF DTE_DATA_NEXT_TIME <= LOOP_FLOW.ETL_NEXT_EXPIRY_TIME THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_NEXT_DATA_TIME = DTE_DATA_NEXT_TIME WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID ,T.ETL_DATA_SUCC_TIME ,T.ETL_DATA_START_TIME ,T.ETL_DATA_END_TIME ,T.ETL_NEXT_DATA_TIME ,T.ETL_FLOW_RUN_STATUS ,A.ETL_CYC_CODE ,A.ETL_FLOW_LEVEL ,A.ETL_FLOW_STATUS FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 ORDER BY A.ETL_FLOW_LEVEL) LOOP IF LOOP_FLOW.ETL_FLOW_LEVEL = 1 AND LOOP_FLOW.ETL_FLOW_STATUS = 1 AND LOOP_FLOW.ETL_FLOW_RUN_STATUS = 9 AND LOOP_FLOW.ETL_DATA_SUCC_TIME < LOOP_FLOW.ETL_NEXT_DATA_TIME THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_DATA_START_TIME = CASE WHEN LOOP_FLOW.ETL_CYC_CODE = '01' THEN T.ETL_DATA_SUCC_TIME + 1 WHEN LOOP_FLOW.ETL_CYC_CODE = '02' THEN T.ETL_DATA_SUCC_TIME + 1 / 24 / 60 / 60 WHEN LOOP_FLOW.ETL_CYC_CODE = '03' THEN T.ETL_DATA_SUCC_TIME + 1 END ,T.ETL_DATA_END_TIME = T.ETL_NEXT_DATA_TIME ,T.ETL_START_TIME = NULL ,T.ETL_END_TIME = NULL ,T.ETL_FLOW_RUN_STATUS = 0 ,T.ETL_RESET_TIME = 0 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1 AND LOOP_FLOW.ETL_FLOW_STATUS = 1 AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 0 THEN DTE_DATA_START_TIME := FN_GET_SUPER_DATA_START_TIME(LOOP_FLOW.ETL_FLOW_ID); DTE_DATA_END_TIME := FN_GET_SUPER_DATA_END_TIME(LOOP_FLOW.ETL_FLOW_ID); UPDATE ETL_FLOW_RUN_STS T SET T.ETL_DATA_START_TIME = DTE_DATA_START_TIME ,T.ETL_DATA_END_TIME = DTE_DATA_END_TIME ,T.ETL_START_TIME = NULL ,T.ETL_END_TIME = NULL ,T.ETL_FLOW_RUN_STATUS = 0 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID ,T.ETL_DATA_START_TIME FLOW_DATA_START_TIME ,T.ETL_DATA_END_TIME FLOW_DATA_END_TIME ,T.ETL_FLOW_RUN_STATUS ,A.ETL_DATA_START_TIME JOB_DATA_START_TIME ,A.ETL_DATA_END_TIME JOB_DATA_END_TIME ,B.ETL_JOB_STATUS FROM ETL_FLOW_RUN_STS T ,ETL_JOB_RUN_STS A ,ETL_CTL_JOB_INFO B WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID AND A.ETL_JOB_ID = B.ETL_JOB_ID AND B.ETL_JOB_STATUS = 1) LOOP IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 0 AND LOOP_JOB.ETL_JOB_STATUS = 1 THEN UPDATE ETL_JOB_RUN_STS T SET T.ETL_DATA_START_TIME = LOOP_JOB.FLOW_DATA_START_TIME ,T.ETL_DATA_END_TIME = LOOP_JOB.FLOW_DATA_END_TIME ,T.ETL_START_TIME = NULL ,T.ETL_END_TIME = NULL ,T.ETL_JOB_RUN_STATUS = 0 ,T.ETL_SESSION_ID = NULL ,T.ETL_LOG_DESC = NULL WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID; END IF; END LOOP; COMMIT; END; /******************************************************************* 程序名 :SP_FLOW_RUN_NEW_PERIOD 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 运行新周期的数据 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_NEW_PERIOD IS BEGIN FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID ,T.ETL_FLOW_RUN_STATUS ,A.ETL_FLOW_LEVEL ,A.ETL_FLOW_STATUS FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 ORDER BY A.ETL_FLOW_LEVEL) LOOP IF LOOP_FLOW.ETL_FLOW_LEVEL = 1 AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0, 3) AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_START_TIME = SYSDATE ,T.ETL_END_TIME = NULL ,T.ETL_FLOW_RUN_STATUS = 1 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1 AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 1 AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0, 3) AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_START_TIME = SYSDATE ,T.ETL_END_TIME = NULL ,T.ETL_FLOW_RUN_STATUS = 1 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; END; /******************************************************************* 程序名 :SP_FLOW_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 作业流状态处理 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_STATUS IS -- VAR_JOB_NO_SESSION VARCHAR2(100); -- 数据库进程不存在 VAR_JOB_OVERTIME_DESC VARCHAR2(100); -- 作业运行超时描述 INT_JOB_OVERTIME NUMBER; -- 作业超时告警时间 INT_LAST_RUNTIME NUMBER; -- 作业上次运行时间 -- INT_JOB_DEAD_TIME NUMBER; -- 调度作业数据库进程不存在运行判定时间 BEGIN -- 获取数据库进程不存在描述 /*SELECT T.ETL_PARA_VAL INTO VAR_JOB_NO_SESSION FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_NO_SESSION_DESC';*/ -- 获取作业运行超时告警时间 SELECT T.ETL_PARA_VAL INTO INT_JOB_OVERTIME FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_OVERTIME'; -- 调度作业数据库进程不存在运行判定时间 /*SELECT T.ETL_PARA_VAL INTO INT_JOB_DEAD_TIME FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_DEAD_TIME';*/ -- 获取作业运行超时描述 SELECT T.ETL_PARA_VAL INTO VAR_JOB_OVERTIME_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_OVERTIME_DESC'; FOR LOOP_JOB IN (SELECT T.ETL_JOB_ID ,A.ETL_OVERTIME_REM_WAY -- ,T.ETL_SESSION_ID ,(SYSDATE - T.ETL_START_TIME) RUNTIME FROM ETL_JOB_RUN_STS T ,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 1) LOOP -- 作业为正在运行,但数据库进程已经不存在(10分钟)的作业置为运行失败 /*IF FN_GET_SESSION_STATUS(LOOP_JOB.ETL_SESSION_ID) = 0 AND LOOP_JOB.RUNTIME * 24 * 60 >= INT_JOB_DEAD_TIME THEN SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID, '', 2, VAR_JOB_NO_SESSION); -- 超时提醒方式为超过上次运行时间 ELS*/ IF LOOP_JOB.ETL_OVERTIME_REM_WAY = 1 AND INT_JOB_OVERTIME > 0 THEN BEGIN SELECT RUNTIME INTO INT_LAST_RUNTIME FROM (SELECT T.ETL_LOGID ,T.ETL_JOB_ID ,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME ,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 9) WHERE ROW_NUM = 1; IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME THEN -- 修改作业状态为运行超时 SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID, '', 4, VAR_JOB_OVERTIME_DESC); END IF; EXCEPTION WHEN OTHERS THEN NULL; END; -- 超时提醒方式为超过前三次的平均值 ELSIF LOOP_JOB.ETL_OVERTIME_REM_WAY = 2 AND INT_JOB_OVERTIME > 0 THEN BEGIN SELECT AVG(RUNTIME) INTO INT_LAST_RUNTIME FROM (SELECT T.ETL_LOGID ,T.ETL_JOB_ID ,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME ,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 9) WHERE ROW_NUM <= 3; IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME THEN -- 修改作业状态为运行超时 SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID, '', 4, VAR_JOB_OVERTIME_DESC); END IF; EXCEPTION WHEN OTHERS THEN NULL; END; END IF; END LOOP; FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID ,T.ETL_DATA_END_TIME FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_RUN_STATUS = 1 ORDER BY A.ETL_FLOW_LEVEL DESC) LOOP -- 下属作业运行成功将作业流状态置为成功 IF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 9 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_DATA_SUCC_TIME = LOOP_FLOW.ETL_DATA_END_TIME ,T.ETL_END_TIME = SYSDATE ,T.ETL_FLOW_RUN_STATUS = 9 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID); -- 下属作业运行失败将作业流置为失败 ELSIF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 2 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_END_TIME = SYSDATE ,T.ETL_FLOW_RUN_STATUS = 2 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID); END IF; END LOOP; END; /******************************************************************* 程序名 :SP_FLOW_RUN_ERROR_RESET 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 失败任务重置,等待重新运行 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_ERROR_RESET IS VAR_MAX_RESET_TIME NUMBER; BEGIN -- 获取最大任务重置次数 SELECT T.ETL_PARA_VAL INTO VAR_MAX_RESET_TIME FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_MAX_RESET_TIME'; FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID ,T.ETL_FLOW_RUN_STATUS ,A.ETL_FLOW_LEVEL ,T.ETL_RESET_TIME FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND T.ETL_FLOW_RUN_STATUS = 2 ORDER BY A.ETL_FLOW_LEVEL) LOOP IF LOOP_FLOW.ETL_FLOW_LEVEL = 1 AND (VAR_MAX_RESET_TIME = 0 OR LOOP_FLOW.ETL_RESET_TIME < VAR_MAX_RESET_TIME) THEN UPDATE ETL_FLOW_RUN_STS T SET /*T.ETL_START_TIME = NULL ,*/ T.ETL_END_TIME = NULL ,T.ETL_FLOW_RUN_STATUS = 3 ,T.ETL_RESET_TIME = T.ETL_RESET_TIME + 1 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1 AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 3 THEN UPDATE ETL_FLOW_RUN_STS T SET /*T.ETL_START_TIME = NULL ,*/ T.ETL_END_TIME = NULL ,T.ETL_FLOW_RUN_STATUS = 3 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID ,T.ETL_FLOW_RUN_STATUS ,A.ETL_JOB_RUN_STATUS FROM ETL_FLOW_RUN_STS T ,ETL_JOB_RUN_STS A ,ETL_CTL_JOB_INFO B WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID AND A.ETL_JOB_ID = B.ETL_JOB_ID AND B.ETL_JOB_STATUS = 1) LOOP -- 将运行失败的作业置为重新运行,等待重新运行 IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 3 AND LOOP_JOB.ETL_JOB_RUN_STATUS = 2 THEN UPDATE ETL_JOB_RUN_STS T SET /*T.ETL_START_TIME = NULL ,*/ T.ETL_END_TIME = NULL ,T.ETL_JOB_RUN_STATUS = 3 ,T.ETL_SESSION_ID = NULL ,T.ETL_LOG_DESC = NULL WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID; END IF; END LOOP; COMMIT; END; /******************************************************************* 程序名 :SP_JOB_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 作业运行状态处理 修改人 : zhuyh 修改时间 :2013/9/30 修改原因 : 进程ID由记录数据库进程改为记录操作系统进程 *******************************************************************/ PROCEDURE SP_JOB_RUN_STATUS ( I_JOB_ID NUMBER ,I_ORG_ID VARCHAR2 ,I_JOB_RUN_STATUS VARCHAR2 ,I_JOB_RUN_INFO VARCHAR2 ) IS -- VAR_SESSION_ID VARCHAR2(10); DTE_DATA_END_TIME DATE; VAR_JOB_SUCC_DESC VARCHAR2(100); VAR_JOB_ERR_DESC VARCHAR2(100); BEGIN -- 获取作业运行成功描述 SELECT T.ETL_PARA_VAL INTO VAR_JOB_SUCC_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_SUCC_DESC'; -- 获取作业运行失败描述 SELECT T.ETL_PARA_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_ERR_DESC'; SELECT ETL_DATA_END_TIME INTO DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID; -- 正在运行 IF I_JOB_RUN_STATUS = 1 THEN -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程 -- 获取数据库进程 -- SELECT SYS_CONTEXT('USERENV', 'SID') INTO VAR_SESSION_ID FROM DUAL; UPDATE ETL_JOB_RUN_STS T SET /*T.ETL_START_TIME = SYSDATE ,*/ T.ETL_DATA_ORG_ID = I_ORG_ID ,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程 -- ,T.ETL_SESSION_ID = VAR_SESSION_ID ,T.ETL_LOG_DESC = I_JOB_RUN_INFO WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; -- 运行失败 ELSIF I_JOB_RUN_STATUS = 2 THEN UPDATE ETL_JOB_RUN_STS T SET T.ETL_END_TIME = SYSDATE ,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS ,T.ETL_LOG_DESC = VAR_JOB_ERR_DESC || I_JOB_RUN_INFO WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; -- 写失败日志 SP_ETL_JOB_LOG_INFO(I_JOB_ID); -- 运行成功 ELSIF I_JOB_RUN_STATUS = 9 THEN UPDATE ETL_JOB_RUN_STS T SET T.ETL_DATA_SUCC_TIME = DTE_DATA_END_TIME ,T.ETL_END_TIME = SYSDATE ,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS ,T.ETL_LOG_DESC = VAR_JOB_SUCC_DESC || I_JOB_RUN_INFO WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; -- 写失败日志 SP_ETL_JOB_LOG_INFO(I_JOB_ID); END IF; END; /******************************************************************* 程序名 :SP_ETL_FLOW_LOG_INFO 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 记录作业流运行日志 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER) IS DTE_DATA_START_TIME DATE; DTE_DATA_END_TIME DATE; INT_COUNT NUMBER; VAR_ETL_LOGID VARCHAR2(30); -- 日志序号在每个数据周期内排序 BEGIN SELECT T.ETL_DATA_START_TIME ,T.ETL_DATA_END_TIME INTO DTE_DATA_START_TIME ,DTE_DATA_END_TIME FROM ETL_FLOW_RUN_STS T WHERE T.ETL_FLOW_ID = I_FLOW_ID; -- 检查该数据周期有没有运行过 SELECT COUNT(*) INTO INT_COUNT FROM ETL_FLOW_RUN_LOG T WHERE T.ETL_FLOW_ID = I_FLOW_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; -- 未运行过的使用数据开始时间从新编号 IF INT_COUNT = 0 THEN VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME, 'YYYYMMDDHH24MISS') || TO_CHAR(DTE_DATA_END_TIME, 'YYYYMMDDHH24MISS') || '01'; ELSE -- 运行过的用最大编号加1 SELECT MAX(ETL_LOGID) INTO VAR_ETL_LOGID FROM ETL_FLOW_RUN_LOG T WHERE T.ETL_FLOW_ID = I_FLOW_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; VAR_ETL_LOGID := VAR_ETL_LOGID + 1; END IF; -- 记日志表 INSERT INTO ETL_FLOW_RUN_LOG (ETL_LOGID ,ETL_FLOW_ID ,ETL_DATA_START_TIME ,ETL_DATA_END_TIME ,ETL_START_TIME ,ETL_END_TIME ,ETL_FLOW_RUN_STATUS ,ETL_LOG_DESC) SELECT VAR_ETL_LOGID ,T.ETL_FLOW_ID ,T.ETL_DATA_START_TIME ,T.ETL_DATA_END_TIME ,T.ETL_START_TIME ,T.ETL_END_TIME ,T.ETL_FLOW_RUN_STATUS ,T.ETL_LOG_DESC FROM ETL_FLOW_RUN_STS T WHERE T.ETL_FLOW_ID = I_FLOW_ID; COMMIT; END; /******************************************************************* 程序名 :SP_ETL_JOB_LOG_INFO 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 记录作业运行日志 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER) IS DTE_DATA_START_TIME DATE; DTE_DATA_END_TIME DATE; INT_COUNT NUMBER; VAR_ETL_LOGID VARCHAR2(30); -- 日志序号在每个数据周期内排序 BEGIN SELECT T.ETL_DATA_START_TIME ,T.ETL_DATA_END_TIME INTO DTE_DATA_START_TIME ,DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID; -- 检测该数据周期任务有没有运行过 SELECT COUNT(*) INTO INT_COUNT FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = I_JOB_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; -- 未运行过的作业使用数据结束时间重新编号 IF INT_COUNT = 0 THEN VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME, 'YYYYMMDDHH24MISS') || TO_CHAR(DTE_DATA_END_TIME, 'YYYYMMDDHH24MISS') || '01'; ELSE -- 运行过的作业用最大编号加1 SELECT MAX(ETL_LOGID) INTO VAR_ETL_LOGID FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = I_JOB_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; VAR_ETL_LOGID := VAR_ETL_LOGID + 1; END IF; -- 记日志表 INSERT INTO ETL_JOB_RUN_LOG (ETL_LOGID ,ETL_JOB_ID ,ETL_DATA_START_TIME ,ETL_DATA_END_TIME ,ETL_DATA_ORG_ID ,ETL_START_TIME ,ETL_END_TIME ,ETL_JOB_RUN_STATUS ,ETL_LOG_DESC) SELECT VAR_ETL_LOGID ,T.ETL_JOB_ID ,T.ETL_DATA_START_TIME ,T.ETL_DATA_END_TIME ,T.ETL_DATA_ORG_ID ,T.ETL_START_TIME ,T.ETL_END_TIME ,T.ETL_JOB_RUN_STATUS ,T.ETL_LOG_DESC FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; END; /******************************************************************* 程序名 :SP_INSERT_JOB_FAIL_SMS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 生成失败作业短信 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_INSERT_MONITOR_SMS ( O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/ ,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/ ) IS V_DTE_RUN_BEGIN_DT DATE; /*程序每一步骤运行开始*/ V_DTE_RUN_END_DT DATE; /*程序每一步骤运行结束时间*/ V_INT_STEP NUMBER := 0; /*程序执行步骤*/ /*步骤描述信息*/ V_VAR_STEP_DESC VARCHAR2(1000); /*步骤所执行的DML类型*/ V_VAR_STEP_DML_TYPE VARCHAR2(10); /*受影响行数*/ V_INT_ROW_CNT INTEGER := 0; /*过程名称*/ V_VAR_PROC_NAME VARCHAR2(70) := 'PKG_SEND_SMS.SP_INSERT_SMS'; BEGIN V_INT_STEP := V_INT_STEP + 1; /*第一步骤*/ V_VAR_STEP_DESC := V_INT_STEP || '.0:作业运行失败发送短信给运营人员 '; V_VAR_STEP_DML_TYPE := 'INSERT'; /*操作类型*/ /*DML开始运行时间*/ V_DTE_RUN_BEGIN_DT := SYSDATE; /*执行相应的SQL语句*/ FOR LOOP_JOB IN (SELECT C.MOBLIE_PHONE ,T.ETL_LOGID ,A.ETL_JOB_ID ,A.ETL_JOB_NAME ,A.ETL_JOB_DESC ,T.ETL_DATA_START_TIME ,T.ETL_DATA_END_TIME ,T.ETL_DATA_ORG_ID ,T.ETL_LOG_DESC FROM ETL_JOB_RUN_LOG T ,ETL_CTL_JOB_INFO A ,ETL_SEND_SMS_LIST C WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 2 AND T.ETL_SEND_FLAG = 0 AND C.SEND_MONITOR_SMS = 1) LOOP INSERT INTO ETL_SEND_SMS_LOG (ETL_LOGID ,ETL_JOB_ID ,MOBLIE_PHONE ,SMS_CONTENT ,SMS_LEVEL ,SEND_TIME ,ETL_DATE) VALUES (LOOP_JOB.ETL_LOGID ,LOOP_JOB.ETL_JOB_ID ,LOOP_JOB.MOBLIE_PHONE ,'作业ID号[' || LOOP_JOB.ETL_JOB_ID || '] ,作业名称[' || LOOP_JOB.ETL_JOB_NAME || '],作业描述[' || LOOP_JOB.ETL_JOB_DESC || '],数据周期[' || TO_CHAR(LOOP_JOB.ETL_DATA_START_TIME, 'YYYYMMDDHH24MISS') || '-' || TO_CHAR(LOOP_JOB.ETL_DATA_END_TIME, 'YYYYMMDDHH24MISS') || '],机构号[' || LOOP_JOB.ETL_DATA_ORG_ID || '],运行描述[' || LOOP_JOB.ETL_LOG_DESC || ']' ,1 ,TRUNC(SYSDATE) ,SYSDATE); /*获取受影响行数*/ V_INT_ROW_CNT := V_INT_ROW_CNT + 1; UPDATE ETL_JOB_RUN_LOG T SET T.ETL_SEND_FLAG = 1 WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID AND T.ETL_LOGID = LOOP_JOB.ETL_LOGID; COMMIT; ---提交DML操作 END LOOP; /*DML运行结束时间*/ V_DTE_RUN_END_DT := SYSDATE; /*记录成功的日志信息*/ IF V_INT_ROW_CNT > 0 THEN PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP ,SYSDATE /*数据开始日期*/ ,SYSDATE /*数据结束日期*/ ,1 /*机构代码*/ ,V_VAR_PROC_NAME /*存储过程名称*/ ,V_VAR_STEP_DESC /*操作步骤描述*/ ,V_VAR_STEP_DML_TYPE /*操作类型*/ ,V_INT_ROW_CNT /*受影响行数*/ ,1 /*执行结果*/ ,V_DTE_RUN_BEGIN_DT /*运行开始时间*/ ,V_DTE_RUN_END_DT /*运行结束时间*/ ,'' /*运行结果详细信息*/); END IF; /*整个过程执行成功*/ O_RESULT_FLAG := 9; /*整个过程运行结果描述信息*/ O_RESULT_MSG := ''; /*异常处理部分*/ EXCEPTION WHEN OTHERS THEN ---回滚DML操作 ROLLBACK; O_RESULT_FLAG := 2; ----失败 O_RESULT_MSG := SQLERRM; ---记录异常日志信息 PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP ,SYSDATE /*数据开始日期*/ ,SYSDATE /*数据结束日期*/ ,1 /*机构代码*/ ,V_VAR_PROC_NAME /*存储过程名称*/ ,V_VAR_STEP_DESC /*操作步骤描述*/ ,V_VAR_STEP_DML_TYPE /*操作步骤类型*/ ,V_INT_ROW_CNT /*返回的受影响行数*/ ,0 /*运行结果 0 失败; 1 成功*/ ,V_DTE_RUN_BEGIN_DT /*运行开始时间*/ ,V_DTE_RUN_END_DT /*运行结束时间*/ ,SQLERRM /*运行结果详细信息*/); END; /******************************************************************* 程序名 :SP_SEND_JOB_FAIL_SMS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 发送失败作业短信 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_SEND_MONITOR_SMS IS V_VAR_RESULT_FLAG CHAR(1); V_VAR_RESULT_MSG CHAR(300); V_VAR_RETURN_STATUS INT; CURSOR C_DATA IS SELECT T.ETL_LOGID ,T.ETL_JOB_ID ,T.MOBLIE_PHONE ,T.SMS_CONTENT ,T.SMS_LEVEL ,T.SEND_TIME FROM ETL_SEND_SMS_LOG T WHERE T.SEND_STATUS = 0; V_USER_CODE VARCHAR2(100) DEFAULT 'MIS'; V_PASSWORD VARCHAR2(100) DEFAULT 'MIS#2013'; BEGIN SP_INSERT_MONITOR_SMS(V_VAR_RESULT_FLAG, V_VAR_RESULT_MSG); IF V_VAR_RESULT_FLAG = 9 THEN FOR CC_DATA IN C_DATA LOOP PKG_SMS_INTERFACE.SEND_SMS(V_USER_CODE ,V_PASSWORD ,CC_DATA.MOBLIE_PHONE ,CC_DATA.SMS_CONTENT ,CC_DATA.SMS_LEVEL ,CC_DATA.SEND_TIME ,V_VAR_RETURN_STATUS); --发送成功,更新标志 IF V_VAR_RETURN_STATUS > 0 THEN UPDATE ETL_SEND_SMS_LOG T SET T.SEND_STATUS = '1' ,T.RECEIVE_STATUS = V_VAR_RETURN_STATUS WHERE T.ETL_LOGID = CC_DATA.ETL_LOGID AND T.ETL_JOB_ID = CC_DATA.ETL_JOB_ID AND T.MOBLIE_PHONE = CC_DATA.MOBLIE_PHONE; END IF; END LOOP; END IF; COMMIT; END; /******************************************************************* 程序名 :FN_GET_FLOW_DEPEND 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取作业流前置依赖 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS VAR_DEP_STS VARCHAR2(100) := 1; BEGIN FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME ,NVL(B.ETL_DATA_SUCC_TIME, DATE '1900-1-1') ETL_DEP_SUCC_TIME ,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_FLOW_ID) ETL_CYC_CODE ,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_DEPD_FLOW_ID) DEP_CYC_CODE FROM ETL_CTL_FLOW_DEPD T ,ETL_FLOW_RUN_STS A ,ETL_FLOW_RUN_STS B WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_DEPD_FLOW_ID = B.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID) LOOP -- 数据结束时间大于所依赖的数据成功时间 IF (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.ETL_CYC_CODE = LOOP_DEP.DEP_CYC_CODE) OR (LOOP_DEP.ETL_DATA_END_TIME >= TRUNC(LOOP_DEP.ETL_DEP_SUCC_TIME) AND LOOP_DEP.ETL_CYC_CODE <> '02' AND LOOP_DEP.DEP_CYC_CODE = '02') OR (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.DEP_CYC_CODE <> '02') THEN VAR_DEP_STS := 0; RETURN VAR_DEP_STS; END IF; END LOOP; RETURN VAR_DEP_STS; EXCEPTION WHEN OTHERS THEN RETURN NULL; END; /******************************************************************* 程序名 :FN_GET_JOB_DEPEND 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取作业前置依赖 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2 IS VAR_DEP_STS VARCHAR2(100) := 1; BEGIN FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME ,NVL(B.ETL_DATA_SUCC_TIME, DATE '1900-1-1') ETL_DEP_SUCC_TIME FROM ETL_CTL_JOB_DEPD T ,ETL_JOB_RUN_STS A ,ETL_JOB_RUN_STS B WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_DEPD_JOB_ID = B.ETL_JOB_ID AND T.ETL_JOB_ID = I_JOB_ID) LOOP -- 数据结束时间大于所依赖的数据成功时间 IF LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME THEN VAR_DEP_STS := 0; RETURN VAR_DEP_STS; END IF; END LOOP; RETURN VAR_DEP_STS; END; /******************************************************************* 程序名 :FN_GET_FLOW_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取作业流运行状态 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS VAR_FLOW_RUN_STATUS VARCHAR2(100) := 1; VAR_CHILD_FLAG CHAR(1); INT_STS_NOT_9 NUMBER; INT_STS_0_3 NUMBER; INT_STS_1_4 NUMBER; INT_STS_2 NUMBER; BEGIN SELECT T.ETL_CHILD_FLAG INTO VAR_CHILD_FLAG FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID; -- 不成功的数量 IF VAR_CHILD_FLAG = 0 THEN SELECT COUNT(*) INTO INT_STS_NOT_9 FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS <> 9; -- 正在运行的数量 SELECT COUNT(*) INTO INT_STS_1_4 FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS IN (1, 4); -- 满足条件但未运行的数量 SELECT COUNT(*) INTO INT_STS_0_3 FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS IN (0, 3) AND FN_GET_FLOW_DEPEND(T.ETL_FLOW_ID) = 1; -- 运行失败的数量 SELECT COUNT(*) INTO INT_STS_2 FROM ETL_FLOW_RUN_STS T ,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS = 2; ELSIF VAR_CHILD_FLAG = 1 THEN SELECT COUNT(*) INTO INT_STS_NOT_9 FROM ETL_JOB_RUN_STS T ,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS <> 9; -- 正在运行的数量 SELECT COUNT(*) INTO INT_STS_1_4 FROM ETL_JOB_RUN_STS T ,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS IN (1, 4); -- 满足条件但未运行的数量 SELECT COUNT(*) INTO INT_STS_0_3 FROM ETL_JOB_RUN_STS T ,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS IN (0, 3) AND FN_GET_FLOW_DEPEND(T.ETL_JOB_ID) = 1; -- 运行失败的数量 SELECT COUNT(*) INTO INT_STS_2 FROM ETL_JOB_RUN_STS T ,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS = 2; END IF; -- 不成功的数量为0,则全部成功 IF INT_STS_NOT_9 = 0 THEN VAR_FLOW_RUN_STATUS := 9; -- 不成功的不为0,正在运行的为0,运行失败的数量大于0 ELSIF INT_STS_1_4 = 0 AND INT_STS_0_3 = 0 AND INT_STS_2 > 0 THEN VAR_FLOW_RUN_STATUS := 2; END IF; RETURN VAR_FLOW_RUN_STATUS; END; /******************************************************************* 程序名 :FN_GET_NEXT_DATA_TIME 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取下个数据时间 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE IS VAR_CTL_CYC_CODE VARCHAR2(2); VAR_SQL VARCHAR2(1000); VAR_FREQ_TIME VARCHAR2(100); VAR_SRC_DB VARCHAR2(100); DTE_SRC_SYS_TIME DATE; DTE_NEXT_DATA_TIME DATE; DTE_DATA_SUCC_TIME DATE; BEGIN -- 获取运行周期、源数据库,下次运行时间、数据成功时间 SELECT T.ETL_CYC_CODE ,T.ETL_SRC_DB ,NVL(A.ETL_NEXT_DATA_TIME, A.ETL_DATA_END_TIME) ,NVL(A.ETL_DATA_SUCC_TIME, A.ETL_DATA_END_TIME) INTO VAR_CTL_CYC_CODE ,VAR_SRC_DB ,DTE_NEXT_DATA_TIME ,DTE_DATA_SUCC_TIME FROM ETL_CTL_JOB_FLOW T ,ETL_FLOW_RUN_STS A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; -- 非本数据库 VAR_SRC_DB := VAR_SRC_DB || 'dual'; -- 每天运行的作业流 IF VAR_CTL_CYC_CODE = '01' THEN VAR_FREQ_TIME := 1; VAR_SQL := 'SELECT TRUNC(SYSDATE) FROM ' || VAR_SRC_DB; EXECUTE IMMEDIATE VAR_SQL INTO DTE_SRC_SYS_TIME; DTE_NEXT_DATA_TIME := DTE_DATA_SUCC_TIME + VAR_FREQ_TIME; IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME THEN RETURN DTE_NEXT_DATA_TIME; ELSE RETURN DTE_DATA_SUCC_TIME; END IF; -- 每十分钟运行的作业流 ELSIF VAR_CTL_CYC_CODE = '02' THEN VAR_FREQ_TIME := 1 / 24 / 6; VAR_SQL := 'SELECT SYSDATE - 1 / 24 / 6 FROM ' || VAR_SRC_DB; EXECUTE IMMEDIATE VAR_SQL INTO DTE_SRC_SYS_TIME; WHILE DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME LOOP DTE_NEXT_DATA_TIME := DTE_NEXT_DATA_TIME + VAR_FREQ_TIME; END LOOP; RETURN DTE_NEXT_DATA_TIME; ELSIF VAR_CTL_CYC_CODE = '03' THEN VAR_FREQ_TIME := 1; VAR_SQL := 'SELECT TRUNC(SYSDATE) FROM ' || VAR_SRC_DB; EXECUTE IMMEDIATE VAR_SQL INTO DTE_SRC_SYS_TIME; DTE_NEXT_DATA_TIME := ADD_MONTHS(DTE_DATA_SUCC_TIME, VAR_FREQ_TIME); IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME THEN RETURN DTE_NEXT_DATA_TIME; ELSE RETURN DTE_DATA_SUCC_TIME; END IF; END IF; END; /******************************************************************* 程序名 :FN_GET_SUPER_FLOW_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取上级作业流运行状态 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS VAR_SUPER_FLOW_RUN_STATUS VARCHAR2(10); BEGIN SELECT ETL_FLOW_RUN_STATUS INTO VAR_SUPER_FLOW_RUN_STATUS FROM ETL_CTL_JOB_FLOW T ,ETL_FLOW_RUN_STS A WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_SUPER_FLOW_RUN_STATUS; END; /******************************************************************* 程序名 :FN_GET_SUPER_DATA_START_TIME 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取上级作业流数据开始时间 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE IS VAR_SUPER_FLOW_DATA_START_TIME DATE; BEGIN SELECT A.ETL_DATA_START_TIME INTO VAR_SUPER_FLOW_DATA_START_TIME FROM ETL_CTL_JOB_FLOW T ,ETL_FLOW_RUN_STS A WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_SUPER_FLOW_DATA_START_TIME; END; /******************************************************************* 程序名 :FN_GET_SUPER_DATA_END_TIME 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取上级作业流数据结束时间 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE IS VAR_SUPER_FLOW_DATA_END_TIME DATE; BEGIN SELECT A.ETL_DATA_END_TIME INTO VAR_SUPER_FLOW_DATA_END_TIME FROM ETL_CTL_JOB_FLOW T ,ETL_FLOW_RUN_STS A WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_SUPER_FLOW_DATA_END_TIME; END; /******************************************************************* 程序名 :FN_GET_CYC_CODE 创建人 : zhuyh 创建时间 : 2013/8/26 功能描述 : 获取周期代码 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2 IS VAR_CYC_CODE VARCHAR2(100); BEGIN SELECT T.ETL_CYC_CODE INTO VAR_CYC_CODE FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_CYC_CODE; END; END PKG_ETL_CTL; /
试试其它关键字
ETL调度
同语言下
.
查看某张表的表结构
.
oracle 集合操作
.
for 循环中倒序 用reverse
.
Oracle 临时表空间收缩
.
查看session及其对应运行的sql
.
Oracle常用查看表结构命令
.
Oracle Connect By Prior用法(实现递归查询)
.
重置排序 SEQUENCE
.
创建一个trigger
.
批量替换字段中的字符
可能有用的
.
C#实现的html内容截取
.
List 切割成几份 工具类
.
SQL查询 多列合并成一行用逗号隔开
.
一行一行读取txt的内容
.
C#动态修改文件夹名称(FSO实现,不移动文件)
.
c# 移动文件或文件夹
.
c#图片添加水印
.
Java PDF转换成图片并输出给前台展示
.
网站后台修改图片尺寸代码
.
处理大图片在缩略图时的展示
ynihui
贡献的其它代码
(
1
)
.
ETL调度
Copyright © 2004 - 2024 dezai.cn. All Rights Reserved
站长博客
粤ICP备13059550号-3