001package org.opengion.plugin.daemon;
002
003import java.util.Date;
004
005import javax.jms.QueueSession;
006
007import org.opengion.fukurou.util.StringUtil;
008import org.opengion.fukurou.queue.QueueInfo;
009import org.opengion.fukurou.queue.QueueSend;
010import org.opengion.fukurou.queue.QueueSendFactory;
011import org.opengion.fukurou.util.HybsTimerTask;
012import org.opengion.hayabusa.common.HybsSystem;
013import org.opengion.hayabusa.queue.DBAccessQueue;
014
015/**
016 * メッセージキュー送信
017 * メッセージキュー送信テーブルを監視して、
018 * 送信処理を行います。
019 * 
020 * @og.group メッセージ連携
021 *
022 * @og.rev 5.10.15.0 (2019/08/30) 新規作成
023 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動
024 * 
025 * @version 5.0
026 * @author oota
027 * @since JDK7
028 *
029 */
030public class Daemon_QueueSend extends HybsTimerTask {
031        private int loopCnt = 0;
032        private static final int LOOP_COUNTER = 24;
033        private QueueSend queueSend;
034
035        private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
036        private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" );
037        private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" );
038        private final String USER_ID = "CYYYYY";
039        private final String PG_ID = "DMN_QueSnd";
040        private final String DMN_NAME = "QueueReceiveDMN";
041        private final DBAccessQueue dbAccessQueue;
042        
043        /**
044         * コンストラクター
045         * 初期処理を行います。
046         */
047        public Daemon_QueueSend(){
048                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
049        }
050        /**
051         * 開始処理
052         * 
053         * 
054         * タイマータスクのデーモン処理の開始ポイントです。
055         * 
056         * @og.rev 5.10.16.1 (2019/10/11) StringUtil変更
057         * 
058         */
059        @Override
060        protected void startDaemon() {
061                if (loopCnt % LOOP_COUNTER == 0) {
062                        loopCnt = 1;
063                        System.out.println();
064                        System.out.println(toString() + " " + new Date() + "");
065                } else {
066                        // メッセージキュー送信管理テーブルから、送信対象のレコードを取得
067                        String[][] vals = dbAccessQueue.selectGE65();
068                        
069                        // 取得データ分の繰り返し処理を実行する
070                        for(int i = 0; i  < vals.length; i++) {
071                                String[] record = vals[i];
072                                
073                                // GE65から取得した値を変数に格納
074                                String ykno =  record[0];
075                                String queueId = record[1];
076                                String message = record[2];
077                                String dedupliId = record[3];
078                                String queSyu = record[4];
079                                String jmsUrl = record[5];
080                                
081                                String queueType = queSyu.toUpperCase();
082                                queueSend = QueueSendFactory.newQueueSend(queueType);
083        
084                                // 接続処理
085                                queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
086                                
087                                // メッセージ送信管理テーブルから取得したデータを送信実装予定
088                                QueueInfo queueInfo = new QueueInfo();
089                                
090                                // 応答確認種別
091                                if("MQ".equals(queueType)){
092                                        // MQメッセージサーバ指定時
093                                        queueInfo.setMqTransacted(false);
094                                        queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
095                                        // キュー名
096                                        queueInfo.setMqQueueName(queueId);
097                                }else if("SQS".equals(queueType)){
098                                        // SQSメッセージサーバ指定時
099                                        // グループID
100                                        queueInfo.setSqsFifoGroupId(queueId);
101//                                      if(!StringUtil.isEmpty(dedupliId)) {
102                                        if(!StringUtil.isNull(dedupliId)) {     
103                                                // 重複排除ID
104                                                // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる)
105                                                queueInfo.setSqsFifoDedupliId(dedupliId);
106                                        }
107                                }
108                        
109                                // メッセージ
110                                queueInfo.setMessage(message);
111                        
112                                // 完了フラグを処理中:2に更新
113                                dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS);
114                                
115                                // メッセージ送信処理
116                                try{
117                                        queueSend.sendMessage(queueInfo);
118                                        
119                                        // 完了フラグを完了:3に更新
120                                        dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END);
121                                        
122                                }catch(Exception e) {
123                                        // 完了フラグをエラー:4に更新して、エラー情報を登録
124                                        dbAccessQueue.updateGE66Error(ykno, e.getMessage());
125                                } 
126                        }
127                        
128                        // クローズ処理
129                        queueSend.close();
130                        
131                        loopCnt++;
132                }
133        }
134}