001package org.opengion.plugin.daemon;
002
003import java.io.File;
004import java.util.Date;
005
006import javax.jms.JMSException;
007import javax.jms.Message;
008import javax.jms.MessageListener;
009import javax.jms.TextMessage;
010
011import org.opengion.fukurou.business.BizUtil;
012import org.opengion.fukurou.queue.QueueInfo;
013import org.opengion.fukurou.queue.QueueReceive;
014import org.opengion.fukurou.queue.QueueReceiveFactory;
015import org.opengion.fukurou.util.HybsTimerTask;
016import org.opengion.fukurou.util.StringUtil;
017import org.opengion.hayabusa.common.HybsSystem;
018import org.opengion.hayabusa.common.HybsSystemException;
019import org.opengion.hayabusa.queue.DBAccessQueue;
020
021/**
022 * メッセージキュー受信 メッセージキューの受信処理を行います。
023 * 
024 * @og.group メッセージ連携
025 * 
026 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
027 *
028 * @version 5.0
029 * @author oota
030 * @since JDK7
031 *
032 */
033public class Daemon_QueueReceive extends HybsTimerTask {
034        private int loopCnt = 0;
035        private QueueReceive queueReceive = null;
036
037        private static final int LOOP_COUNTER = 24;
038
039        private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY");
040        private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY");
041        private final String MQ_QUEUE_TYPE;
042        private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL");
043        private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER"); 
044
045        private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
046        private final String USER_ID = "CYYYYY";
047        private final String PG_ID;
048        private final String DMN_NAME = "QueueReceiveDMN";
049        private final DBAccessQueue dbAccessQueue;
050
051        /**
052         * コンストラクター
053         * 初期処理を行います。
054         */
055        public Daemon_QueueReceive() {
056                super();
057                
058                // パラメータの設定
059                if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
060                        MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
061                        PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
062                }else {
063                        throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
064                }
065
066                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
067        }
068
069        /**
070         * 初期処理 MQサーバに接続します。
071         */
072        @Override
073        public void initDaemon() {
074                // 開始ログO
075                StringBuilder errMsg = new StringBuilder();
076                if (MQ_QUEUE_TYPE == null) {
077                        errMsg.append("MQ_QUEUE_TYPE");
078                }
079                if (MQ_QURUE_SERVER_URL == null) {
080                        errMsg.append(" MQ_QUEUE_SERVER_URL");
081                }
082
083                if (errMsg.length() > 0) {
084                        errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。");
085                        throw new HybsSystemException(errMsg.toString());
086                }
087
088                String queueType = MQ_QUEUE_TYPE.toUpperCase();
089
090                // 開始ログ
091                System.out.println("MQキュータイプ:" + queueType);
092                System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL);
093
094                queueReceive = QueueReceiveFactory.newQueueReceive(queueType);
095
096                queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
097
098        }
099
100        /**
101         * 開始処理 タイマータスクのデーモン処理の開始ポイントです。
102         */
103        @Override
104        protected void startDaemon() {
105                if (loopCnt % LOOP_COUNTER == 0) {
106                        loopCnt = 1;
107                        System.out.println();
108                        System.out.print(toString() + " " + new Date() + " ");
109                } else {
110                        // 対象 キュー名(グループ名)とbizlogic名の取得処理
111                        String[][] ge67vals = dbAccessQueue.setlectGE67();
112                        // キュー情報登録チェック
113                        if (ge67vals.length == 0) {
114                                String errMsg = "GE67にキュー情報が登録されていません。";
115                                throw new RuntimeException(errMsg);
116                        }
117                        // MQとSQSで処理を分岐
118                        // MQ:指定キューIDからキューメッセージを取得
119                        // SQS:キューメッセージを取得してからキューID(グループID)を取得
120                        switch (MQ_QUEUE_TYPE) {
121                                case "MQ":
122                                        processMq(ge67vals);
123                                        break;
124                                case "SQS":
125                                        processSqs(ge67vals);
126                                        break;
127                                default:
128                                        String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE;
129                                        throw new RuntimeException(errMsg);
130                        }
131
132                        loopCnt++;
133                }
134        }
135
136        /**
137         * MQ用の処理
138         * GE67に登録されているキューIDの、
139         * メッセージキューを取得して処理を行います。
140         * 
141         * @param ge67vals
142         */
143        private void processMq(final String[][] ge67vals) {
144                boolean listenerMode = false;
145                
146                if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) {
147                        listenerMode = true;
148                }
149                
150                if(listenerMode) {
151                        // リスナーの初期化
152                        queueReceive.closeListener();
153                }
154
155                // ge67のキューリスト分繰り返します
156                for (int row = 0; row < ge67vals.length; row++) {
157                        String queueId = ge67vals[row][0];
158                        String bizLogicId = ge67vals[row][1];
159
160                        if(listenerMode) {
161                                // リスナーを設定して、動的な受信処理(MQ専用)
162                                QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId);
163                                queueReceive.setListener(queueId, listener);
164                        }else {
165                                // 1件の受信処理
166                                QueueInfo queueInfo = queueReceive.receive(queueId);
167                                if (queueInfo != null) {
168                                        processMessage(queueId, bizLogicId, queueInfo.getMessage());
169                                        // 1件処理を行ったら処理を終了します。
170                                        break;
171                                }
172                        }
173                }
174        }
175
176        /**
177         * SQS用の処理
178         * SQSはグループIDを指定して、キューを取得することはできず、
179         * 任意のキューを1つ取得してから、
180         * 判定処理を行います。
181         * GE67に登録されていないグループIDのキューが取得された場合は、
182         * GE68にエラーレコードを登録します。
183         * 
184         * @param ge67vals
185         */
186        private void processSqs(final String[][] ge67vals) {
187                // 下記はSQSの場合(キューを1件取得して処理)
188                QueueInfo queueInfo = queueReceive.receive(null);
189                
190                // キューが未取得の場合
191                if(queueInfo == null) {
192                        return;
193                }
194                
195                // 受信したキューを処理
196                final String groupId = queueInfo.getSqsFifoGroupId();
197                Boolean existsFlg = false;
198                // valsにグループIDのレコードが存在するか検索
199                for (int row = 0; row < ge67vals.length; row++) {
200                        final String queueId = ge67vals[row][0];
201
202                        if (groupId != null && groupId.equals(queueId)) {
203                                // 該当レコードあり
204                                final String bizLogicId = ge67vals[row][1];
205                                processMessage(queueId, bizLogicId, queueInfo.getMessage());
206
207                                existsFlg = true;
208                                break;
209                        }
210                }
211
212                if (!existsFlg) {
213                        // 該当groupIdの未登録エラー
214                        // 処理番号生成
215                        String syoriNo = dbAccessQueue.generateSyoriNo();
216                        dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage());
217                        dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。");
218                }
219        }
220
221        /**
222         * キャンセル処理
223         * タイマータスクのデーモン処理の終了ポイントです。
224         */
225        @Override
226        public boolean cancel() {
227                if (queueReceive != null) {
228                        queueReceive.close();
229                }
230
231                return super.cancel();
232        }
233
234        /**
235         * メッセージの処理
236         *  受信したメッセージをbizLogicに渡して、
237         *  処理を実行します。
238         *  
239         * @param queueId キューID
240         * @param bizLogicId bizLogicId
241         * @param msgText 受信メッセージ
242         */
243        private void processMessage(final String queueId, final String bizLogicId, final String msgText) {
244                String syoriNo = "";
245                try {
246                        // 処理番号生成
247                        syoriNo = dbAccessQueue.generateSyoriNo();
248
249                        // 管理テーブル登録
250                        dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText);
251
252                        // bizLogicの処理を実行
253                        callActBizLogic(SYSTEM_ID, bizLogicId, msgText);
254
255                        // 管理テーブル更新(完了)
256                        dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END);
257
258                } catch (Throwable te) {
259                        // bizLogicでのエラーはログの未出力して、処理を継続します。
260                        // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。
261                        String errMessage = null;
262                        if (te.getCause() != null) {
263                                // causeが設定されている場合のエラー情報
264                                errMessage = te.getCause().getMessage();
265                        } else {
266                                // causeが未設定の場合のエラー情報
267                                errMessage = te.getMessage();
268                        }
269                        System.out.println(errMessage);
270                        try {
271                                // エラーテーブルに登録
272                                dbAccessQueue.updateGE68Error(syoriNo, errMessage);
273                        } catch (Exception e) {
274                                // ここでのエラーはスルーします。
275                                System.out.println("管理テーブル登録エラー:" + e.getMessage());
276                        }
277                }
278        }
279
280        /**
281         * bizLogic処理の呼び出し
282         * 必要なパス情報をリソースから取得して、
283         * BizUtil.actBizLogicにパス情報を渡すことで、
284         * bizLogicの処理を行います。
285         * 
286         * @param systemId  システムID
287         * @param logicName ロジックファイル名
288         * @param msgText   メッセージ
289         * @throws Throwable エラー情報
290         */
291        private void callActBizLogic(String systemId, String logicName, String msgText) throws Throwable {
292                // 対象 クラスパスの生成
293                // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
294                // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
295                // bizLogicTag.javaのコードを移植
296                StringBuilder sb = new StringBuilder();
297                sb.append('.').append(File.pathSeparatorChar);
298                File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib");
299                File[] libFiles = lib.listFiles();
300                for (int i = 0; i < libFiles.length; i++) {
301                        sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar);
302                }
303                sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar);
304                // bizの下のパス
305                sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar);
306                // 上記で生成したクラスパスをclassPathに格納
307                String classPath = sb.toString();
308
309                // ソースパス情報の生成
310                String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH");
311                String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH");
312                boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE");
313                boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY");
314
315                // bizLogicに渡すパラメータ
316                String[] keys = new String[] { "message" };
317                String[] vals = new String[] { msgText };
318
319                // bizLogic処理の実行
320                BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals);
321        }
322
323        /**
324         * 受信処理リスナー用のインナークラス
325         * QueueReceiveリスナークラス リスナー用のクラスです。
326         *  MQに設定することで、メッセージが受信されると、
327         * onMessageメソッドが実行されます。
328         *
329         */
330        class QueueReceiveListener implements MessageListener {
331                private String queueId = "";
332                private String bizLogicId = "";
333
334                /**
335                 * コンストラクター 初期処理を行います。
336                 * 
337                 * @param quId  キューID
338                 * @param bizId bizLogicId
339                 */
340                public QueueReceiveListener(final String quId, final String bizId) {
341                        queueId = quId;
342                        bizLogicId = bizId;
343                }
344
345                /**
346                 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。
347                 * 
348                 * @param Message 受信メッセージ
349                 */
350                @Override
351                public void onMessage(final Message message) {
352                        // 要求番号
353                        String ykno = "";
354
355                        // メッセージ受信
356                        TextMessage msg = (TextMessage) message;
357                        String msgText = "";
358
359                        try {
360                                // キューサーバのメッセージを取得
361                                msgText = msg.getText();
362
363                                // メーッセージの受信応答を返します。
364                                msg.acknowledge();
365
366                                processMessage(queueId, bizLogicId, msgText);
367
368                        } catch (JMSException jmse) {
369                                try {
370                                        // 管理テーブル更新
371                                        // 管理テーブル更新(エラー)
372                                        dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR);
373                                } catch (Exception e) {
374                                        // ここでのエラーはスルーします。
375                                        System.out.println("管理テーブル登録エラー:" + e.getMessage());
376                                }
377
378                                throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage());
379                        }
380                }
381        }
382}