001package org.opengion.plugin.daemon;
002
003import java.io.File;
004import java.util.Date;
005
006import javax.jms.JMSException;
007import javax.jms.Message;
008import javax.jms.MessageListener;
009import javax.jms.TextMessage;
010
011// import org.opengion.fukurou.util.BizUtil;
012import org.opengion.fukurou.business.BizUtil;
013import org.opengion.fukurou.queue.QueueInfo;
014import org.opengion.fukurou.queue.QueueReceive;
015import org.opengion.fukurou.queue.QueueReceiveFactory;
016import org.opengion.fukurou.util.HybsTimerTask;
017import org.opengion.fukurou.util.StringUtil;
018import org.opengion.hayabusa.common.HybsSystem;
019import org.opengion.hayabusa.common.HybsSystemException;
020import org.opengion.hayabusa.queue.DBAccessQueue;
021
022/**
023 * メッセージキュー受信 メッセージキューの受信処理を行います。
024 * 
025 * @og.group メッセージ連携
026 * 
027 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
028 *
029 * @version 5.0
030 * @author oota
031 * @since JDK7
032 *
033 */
034public class Daemon_QueueReceive extends HybsTimerTask {
035        private int loopCnt = 0;
036        private QueueReceive queueReceive = null;
037
038        private static final int LOOP_COUNTER = 24;
039
040        private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY");
041        private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY");
042        private final String MQ_QUEUE_TYPE;
043        private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL");
044        private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER"); 
045
046        private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
047        private final String USER_ID = "CYYYYY";
048        private final String PG_ID;
049        private final String DMN_NAME = "QueueReceiveDMN";
050        private final DBAccessQueue dbAccessQueue;
051
052        /**
053         * コンストラクター
054         * 初期処理を行います。
055         */
056        public Daemon_QueueReceive() {
057                super();
058                
059                // パラメータの設定
060                if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
061                        MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
062                        PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
063                }else {
064                        throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
065                }
066
067                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
068        }
069
070        /**
071         * 初期処理 MQサーバに接続します。
072         */
073        @Override
074        public void initDaemon() {
075                // 開始ログO
076                final StringBuilder errMsg = new StringBuilder();
077                if (MQ_QUEUE_TYPE == null) {
078                        errMsg.append("MQ_QUEUE_TYPE");
079                }
080                if (MQ_QURUE_SERVER_URL == null) {
081                        errMsg.append(" MQ_QUEUE_SERVER_URL");
082                }
083
084                if (errMsg.length() > 0) {
085                        errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。");
086                        throw new HybsSystemException(errMsg.toString());
087                }
088
089                final String queueType = MQ_QUEUE_TYPE.toUpperCase();
090
091                // 開始ログ
092                System.out.println("MQキュータイプ:" + queueType);
093                System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL);
094
095                queueReceive = QueueReceiveFactory.newQueueReceive(queueType);
096
097                queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
098
099        }
100
101        /**
102         * 開始処理 タイマータスクのデーモン処理の開始ポイントです。
103         */
104        @Override
105        protected void startDaemon() {
106                if (loopCnt % LOOP_COUNTER == 0) {
107                        loopCnt = 1;
108                        System.out.println();
109                        System.out.print(toString() + " " + new Date() + " ");
110                } else {
111                        // 対象 キュー名(グループ名)とbizlogic名の取得処理
112                        final String[][] ge67vals = dbAccessQueue.setlectGE67();
113                        // キュー情報登録チェック
114                        if (ge67vals.length == 0) {
115                                final String errMsg = "GE67にキュー情報が登録されていません。";
116                                throw new RuntimeException(errMsg);
117                        }
118                        // MQとSQSで処理を分岐
119                        // MQ:指定キューIDからキューメッセージを取得
120                        // SQS:キューメッセージを取得してからキューID(グループID)を取得
121                        switch (MQ_QUEUE_TYPE) {
122                                case "MQ":
123                                        processMq(ge67vals);
124                                        break;
125                                case "SQS":
126                                        processSqs(ge67vals);
127                                        break;
128                                default:
129                                        final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE;
130                                        throw new RuntimeException(errMsg);
131                        }
132
133                        loopCnt++;
134                }
135        }
136
137        /**
138         * MQ用の処理
139         * GE67に登録されているキューIDの、
140         * メッセージキューを取得して処理を行います。
141         * 
142         * @param ge67vals GE67の配列データ
143         */
144        private void processMq(final String[][] ge67vals) {
145                boolean listenerMode = false;
146                
147                if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) {
148                        listenerMode = true;
149                }
150                
151                if(listenerMode) {
152                        // リスナーの初期化
153                        queueReceive.closeListener();
154                }
155
156                // ge67のキューリスト分繰り返します
157                for (int row = 0; row < ge67vals.length; row++) {
158                        final String queueId = ge67vals[row][0];
159                        final String bizLogicId = ge67vals[row][1];
160
161                        if(listenerMode) {
162                                // リスナーを設定して、動的な受信処理(MQ専用)
163                                final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId);
164                                queueReceive.setListener(queueId, listener);
165                        }else {
166                                // 1件の受信処理
167                                final QueueInfo queueInfo = queueReceive.receive(queueId);
168                                if (queueInfo != null) {
169                                        processMessage(queueId, bizLogicId, queueInfo.getMessage());
170                                        // 1件処理を行ったら処理を終了します。
171                                        break;
172                                }
173                        }
174                }
175        }
176
177        /**
178         * SQS用の処理
179         * SQSはグループIDを指定して、キューを取得することはできず、
180         * 任意のキューを1つ取得してから、
181         * 判定処理を行います。
182         * GE67に登録されていないグループIDのキューが取得された場合は、
183         * GE68にエラーレコードを登録します。
184         * 
185         * @param ge67vals GE67の配列データ
186         */
187        private void processSqs(final String[][] ge67vals) {
188                // 下記はSQSの場合(キューを1件取得して処理)
189                final QueueInfo queueInfo = queueReceive.receive(null);
190                
191                // キューが未取得の場合
192                if(queueInfo == null) {
193                        return;
194                }
195                
196                // 受信したキューを処理
197                final String groupId = queueInfo.getSqsFifoGroupId();
198                Boolean existsFlg = false;
199                // valsにグループIDのレコードが存在するか検索
200                for (int row = 0; row < ge67vals.length; row++) {
201                        final String queueId = ge67vals[row][0];
202
203                        if (groupId != null && groupId.equals(queueId)) {
204                                // 該当レコードあり
205                                final String bizLogicId = ge67vals[row][1];
206                                processMessage(queueId, bizLogicId, queueInfo.getMessage());
207
208                                existsFlg = true;
209                                break;
210                        }
211                }
212
213                if (!existsFlg) {
214                        // 該当groupIdの未登録エラー
215                        // 処理番号生成
216                        final String syoriNo = dbAccessQueue.generateSyoriNo();
217                        dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage());
218                        dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。");
219                }
220        }
221
222        /**
223         * キャンセル処理
224         * タイマータスクのデーモン処理の終了ポイントです。
225         *
226         * @return キャンセルできれば、true
227         */
228        @Override
229        public boolean cancel() {
230                if (queueReceive != null) {
231                        queueReceive.close();
232                }
233
234                return super.cancel();
235        }
236
237        /**
238         * メッセージの処理
239         *  受信したメッセージをbizLogicに渡して、
240         *  処理を実行します。
241         *  
242         * @param queueId キューID
243         * @param bizLogicId ビズロジックID
244         * @param msgText 受信メッセージ
245         */
246        private void processMessage(final String queueId, final String bizLogicId, final String msgText) {
247                String syoriNo = "";
248                try {
249                        // 処理番号生成
250                        syoriNo = dbAccessQueue.generateSyoriNo();
251
252                        // 管理テーブル登録
253                        dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText);
254
255                        // bizLogicの処理を実行
256                        callActBizLogic(SYSTEM_ID, bizLogicId, msgText);
257
258                        // 管理テーブル更新(完了)
259                        dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END);
260
261                } catch (Throwable te) {
262                        // bizLogicでのエラーはログの未出力して、処理を継続します。
263                        // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。
264                        String errMessage = null;
265                        if (te.getCause() != null) {
266                                // causeが設定されている場合のエラー情報
267                                errMessage = te.getCause().getMessage();
268                        } else {
269                                // causeが未設定の場合のエラー情報
270                                errMessage = te.getMessage();
271                        }
272                        System.out.println(errMessage);
273                        try {
274                                // エラーテーブルに登録
275                                dbAccessQueue.updateGE68Error(syoriNo, errMessage);
276                        } catch (Exception e) {
277                                // ここでのエラーはスルーします。
278                                System.out.println("管理テーブル登録エラー:" + e.getMessage());
279                        }
280                }
281        }
282
283        /**
284         * bizLogic処理の呼び出し
285         * 必要なパス情報をリソースから取得して、
286         * BizUtil.actBizLogicにパス情報を渡すことで、
287         * bizLogicの処理を行います。
288         * 
289         * @param systemId  システムID
290         * @param logicName ロジックファイル名
291         * @param msgText   メッセージ
292         * @throws Throwable エラー情報
293         */
294        private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
295                // 対象 クラスパスの生成
296                // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
297                // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
298                // bizLogicTag.javaのコードを移植
299                final StringBuilder sb = new StringBuilder();
300                sb.append('.').append(File.pathSeparatorChar);
301                final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib");
302                final File[] libFiles = lib.listFiles();
303                for (int i = 0; i < libFiles.length; i++) {
304                        sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar);
305                }
306                sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar);
307                // bizの下のパス
308                sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar);
309                // 上記で生成したクラスパスをclassPathに格納
310                final String classPath = sb.toString();
311
312                // ソースパス情報の生成
313                final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH");
314                final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH");
315                final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE");
316                final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY");
317
318                // bizLogicに渡すパラメータ
319                final String[] keys = new String[] { "message" };
320                final String[] vals = new String[] { msgText };
321
322                // bizLogic処理の実行
323                BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals);
324        }
325
326        /**
327         * 受信処理リスナー用のインナークラス
328         * QueueReceiveリスナークラス リスナー用のクラスです。
329         *  MQに設定することで、メッセージが受信されると、
330         * onMessageメソッドが実行されます。
331         *
332         */
333        class QueueReceiveListener implements MessageListener {
334//              private String queueId = "";
335//              private String bizLogicId = "";
336                private final String queueId ;
337                private final String bizLogicId ;
338
339                /**
340                 * コンストラクター 初期処理を行います。
341                 * 
342                 * @param quId  キューID
343                 * @param bizId ビズロジックID
344                 */
345                public QueueReceiveListener(final String quId, final String bizId) {
346                        queueId    = quId;
347                        bizLogicId = bizId;
348                }
349
350                /**
351                 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。
352                 * 
353                 * @param message 受信メッセージ
354                 */
355                @Override
356                public void onMessage(final Message message) {
357                        // 要求番号
358                        String ykno = "";
359
360                        // メッセージ受信
361                        final TextMessage msg = (TextMessage) message;
362                        String msgText = "";
363
364                        try {
365                                // キューサーバのメッセージを取得
366                                msgText = msg.getText();
367
368                                // メーッセージの受信応答を返します。
369                                msg.acknowledge();
370
371                                processMessage(queueId, bizLogicId, msgText);
372
373                        } catch (JMSException jmse) {
374                                try {
375                                        // 管理テーブル更新
376                                        // 管理テーブル更新(エラー)
377                                        dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR);
378                                } catch (Exception e) {
379                                        // ここでのエラーはスルーします。
380                                        System.out.println("管理テーブル登録エラー:" + e.getMessage());
381                                }
382
383                                throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage());
384                        }
385                }
386        }
387}