001package org.opengion.plugin.daemon; 002 003import java.io.File; 004import java.util.Date; 005 006import javax.jms.JMSException; 007import javax.jms.Message; 008import javax.jms.MessageListener; 009import javax.jms.TextMessage; 010 011import org.opengion.fukurou.business.BizUtil; 012import org.opengion.fukurou.queue.QueueInfo; 013import org.opengion.fukurou.queue.QueueReceive; 014import org.opengion.fukurou.queue.QueueReceiveFactory; 015import org.opengion.fukurou.util.HybsTimerTask; 016import org.opengion.fukurou.util.StringUtil; 017import org.opengion.hayabusa.common.HybsSystem; 018import org.opengion.hayabusa.common.HybsSystemException; 019import org.opengion.hayabusa.queue.DBAccessQueue; 020 021/** 022 * メッセージキュー受信 メッセージキューの受信処理を行います。 023 * 024 * @og.group メッセージ連携 025 * 026 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 027 * 028 * @version 5.0 029 * @author oota 030 * @since JDK7 031 * 032 */ 033public class Daemon_QueueReceive extends HybsTimerTask { 034 private int loopCnt = 0; 035 private QueueReceive queueReceive = null; 036 037 private static final int LOOP_COUNTER = 24; 038 039 private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY"); 040 private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY"); 041 private final String MQ_QUEUE_TYPE; 042 private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL"); 043 private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER"); 044 045 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 046 private final String USER_ID = "CYYYYY"; 047 private final String PG_ID; 048 private final String DMN_NAME = "QueueReceiveDMN"; 049 private final DBAccessQueue dbAccessQueue; 050 051 /** 052 * コンストラクター 053 * 初期処理を行います。 054 */ 055 public Daemon_QueueReceive() { 056 super(); 057 058 // パラメータの設定 059 if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 060 MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 061 PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 062 }else { 063 throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 064 } 065 066 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 067 } 068 069 /** 070 * 初期処理 MQサーバに接続します。 071 */ 072 @Override 073 public void initDaemon() { 074 // 開始ログO 075 StringBuilder errMsg = new StringBuilder(); 076 if (MQ_QUEUE_TYPE == null) { 077 errMsg.append("MQ_QUEUE_TYPE"); 078 } 079 if (MQ_QURUE_SERVER_URL == null) { 080 errMsg.append(" MQ_QUEUE_SERVER_URL"); 081 } 082 083 if (errMsg.length() > 0) { 084 errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。"); 085 throw new HybsSystemException(errMsg.toString()); 086 } 087 088 String queueType = MQ_QUEUE_TYPE.toUpperCase(); 089 090 // 開始ログ 091 System.out.println("MQキュータイプ:" + queueType); 092 System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL); 093 094 queueReceive = QueueReceiveFactory.newQueueReceive(queueType); 095 096 queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 097 098 } 099 100 /** 101 * 開始処理 タイマータスクのデーモン処理の開始ポイントです。 102 */ 103 @Override 104 protected void startDaemon() { 105 if (loopCnt % LOOP_COUNTER == 0) { 106 loopCnt = 1; 107 System.out.println(); 108 System.out.print(toString() + " " + new Date() + " "); 109 } else { 110 // 対象 キュー名(グループ名)とbizlogic名の取得処理 111 String[][] ge67vals = dbAccessQueue.setlectGE67(); 112 // キュー情報登録チェック 113 if (ge67vals.length == 0) { 114 String errMsg = "GE67にキュー情報が登録されていません。"; 115 throw new RuntimeException(errMsg); 116 } 117 // MQとSQSで処理を分岐 118 // MQ:指定キューIDからキューメッセージを取得 119 // SQS:キューメッセージを取得してからキューID(グループID)を取得 120 switch (MQ_QUEUE_TYPE) { 121 case "MQ": 122 processMq(ge67vals); 123 break; 124 case "SQS": 125 processSqs(ge67vals); 126 break; 127 default: 128 String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE; 129 throw new RuntimeException(errMsg); 130 } 131 132 loopCnt++; 133 } 134 } 135 136 /** 137 * MQ用の処理 138 * GE67に登録されているキューIDの、 139 * メッセージキューを取得して処理を行います。 140 * 141 * @param ge67vals 142 */ 143 private void processMq(final String[][] ge67vals) { 144 boolean listenerMode = false; 145 146 if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) { 147 listenerMode = true; 148 } 149 150 if(listenerMode) { 151 // リスナーの初期化 152 queueReceive.closeListener(); 153 } 154 155 // ge67のキューリスト分繰り返します 156 for (int row = 0; row < ge67vals.length; row++) { 157 String queueId = ge67vals[row][0]; 158 String bizLogicId = ge67vals[row][1]; 159 160 if(listenerMode) { 161 // リスナーを設定して、動的な受信処理(MQ専用) 162 QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId); 163 queueReceive.setListener(queueId, listener); 164 }else { 165 // 1件の受信処理 166 QueueInfo queueInfo = queueReceive.receive(queueId); 167 if (queueInfo != null) { 168 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 169 // 1件処理を行ったら処理を終了します。 170 break; 171 } 172 } 173 } 174 } 175 176 /** 177 * SQS用の処理 178 * SQSはグループIDを指定して、キューを取得することはできず、 179 * 任意のキューを1つ取得してから、 180 * 判定処理を行います。 181 * GE67に登録されていないグループIDのキューが取得された場合は、 182 * GE68にエラーレコードを登録します。 183 * 184 * @param ge67vals 185 */ 186 private void processSqs(final String[][] ge67vals) { 187 // 下記はSQSの場合(キューを1件取得して処理) 188 QueueInfo queueInfo = queueReceive.receive(null); 189 190 // キューが未取得の場合 191 if(queueInfo == null) { 192 return; 193 } 194 195 // 受信したキューを処理 196 final String groupId = queueInfo.getSqsFifoGroupId(); 197 Boolean existsFlg = false; 198 // valsにグループIDのレコードが存在するか検索 199 for (int row = 0; row < ge67vals.length; row++) { 200 final String queueId = ge67vals[row][0]; 201 202 if (groupId != null && groupId.equals(queueId)) { 203 // 該当レコードあり 204 final String bizLogicId = ge67vals[row][1]; 205 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 206 207 existsFlg = true; 208 break; 209 } 210 } 211 212 if (!existsFlg) { 213 // 該当groupIdの未登録エラー 214 // 処理番号生成 215 String syoriNo = dbAccessQueue.generateSyoriNo(); 216 dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage()); 217 dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。"); 218 } 219 } 220 221 /** 222 * キャンセル処理 223 * タイマータスクのデーモン処理の終了ポイントです。 224 */ 225 @Override 226 public boolean cancel() { 227 if (queueReceive != null) { 228 queueReceive.close(); 229 } 230 231 return super.cancel(); 232 } 233 234 /** 235 * メッセージの処理 236 * 受信したメッセージをbizLogicに渡して、 237 * 処理を実行します。 238 * 239 * @param queueId キューID 240 * @param bizLogicId bizLogicId 241 * @param msgText 受信メッセージ 242 */ 243 private void processMessage(final String queueId, final String bizLogicId, final String msgText) { 244 String syoriNo = ""; 245 try { 246 // 処理番号生成 247 syoriNo = dbAccessQueue.generateSyoriNo(); 248 249 // 管理テーブル登録 250 dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText); 251 252 // bizLogicの処理を実行 253 callActBizLogic(SYSTEM_ID, bizLogicId, msgText); 254 255 // 管理テーブル更新(完了) 256 dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END); 257 258 } catch (Throwable te) { 259 // bizLogicでのエラーはログの未出力して、処理を継続します。 260 // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。 261 String errMessage = null; 262 if (te.getCause() != null) { 263 // causeが設定されている場合のエラー情報 264 errMessage = te.getCause().getMessage(); 265 } else { 266 // causeが未設定の場合のエラー情報 267 errMessage = te.getMessage(); 268 } 269 System.out.println(errMessage); 270 try { 271 // エラーテーブルに登録 272 dbAccessQueue.updateGE68Error(syoriNo, errMessage); 273 } catch (Exception e) { 274 // ここでのエラーはスルーします。 275 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 276 } 277 } 278 } 279 280 /** 281 * bizLogic処理の呼び出し 282 * 必要なパス情報をリソースから取得して、 283 * BizUtil.actBizLogicにパス情報を渡すことで、 284 * bizLogicの処理を行います。 285 * 286 * @param systemId システムID 287 * @param logicName ロジックファイル名 288 * @param msgText メッセージ 289 * @throws Throwable エラー情報 290 */ 291 private void callActBizLogic(String systemId, String logicName, String msgText) throws Throwable { 292 // 対象 クラスパスの生成 293 // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 294 // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 295 // bizLogicTag.javaのコードを移植 296 StringBuilder sb = new StringBuilder(); 297 sb.append('.').append(File.pathSeparatorChar); 298 File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib"); 299 File[] libFiles = lib.listFiles(); 300 for (int i = 0; i < libFiles.length; i++) { 301 sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar); 302 } 303 sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar); 304 // bizの下のパス 305 sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar); 306 // 上記で生成したクラスパスをclassPathに格納 307 String classPath = sb.toString(); 308 309 // ソースパス情報の生成 310 String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH"); 311 String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH"); 312 boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE"); 313 boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY"); 314 315 // bizLogicに渡すパラメータ 316 String[] keys = new String[] { "message" }; 317 String[] vals = new String[] { msgText }; 318 319 // bizLogic処理の実行 320 BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals); 321 } 322 323 /** 324 * 受信処理リスナー用のインナークラス 325 * QueueReceiveリスナークラス リスナー用のクラスです。 326 * MQに設定することで、メッセージが受信されると、 327 * onMessageメソッドが実行されます。 328 * 329 */ 330 class QueueReceiveListener implements MessageListener { 331 private String queueId = ""; 332 private String bizLogicId = ""; 333 334 /** 335 * コンストラクター 初期処理を行います。 336 * 337 * @param quId キューID 338 * @param bizId bizLogicId 339 */ 340 public QueueReceiveListener(final String quId, final String bizId) { 341 queueId = quId; 342 bizLogicId = bizId; 343 } 344 345 /** 346 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。 347 * 348 * @param Message 受信メッセージ 349 */ 350 @Override 351 public void onMessage(final Message message) { 352 // 要求番号 353 String ykno = ""; 354 355 // メッセージ受信 356 TextMessage msg = (TextMessage) message; 357 String msgText = ""; 358 359 try { 360 // キューサーバのメッセージを取得 361 msgText = msg.getText(); 362 363 // メーッセージの受信応答を返します。 364 msg.acknowledge(); 365 366 processMessage(queueId, bizLogicId, msgText); 367 368 } catch (JMSException jmse) { 369 try { 370 // 管理テーブル更新 371 // 管理テーブル更新(エラー) 372 dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR); 373 } catch (Exception e) { 374 // ここでのエラーはスルーします。 375 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 376 } 377 378 throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage()); 379 } 380 } 381 } 382}