001package org.opengion.plugin.daemon; 002 003import java.io.File; 004import java.util.Date; 005import java.util.Locale; // 7.2.9.4 (2020/11/20) 006 007import javax.jms.JMSException; 008import javax.jms.Message; 009import javax.jms.MessageListener; 010import javax.jms.TextMessage; 011 012// import org.opengion.fukurou.util.BizUtil; 013import org.opengion.fukurou.business.BizUtil; 014import org.opengion.fukurou.queue.QueueInfo; 015import org.opengion.fukurou.queue.QueueReceive; 016import org.opengion.fukurou.queue.QueueReceiveFactory; 017import org.opengion.fukurou.util.HybsTimerTask; 018import org.opengion.fukurou.util.StringUtil; 019import org.opengion.hayabusa.common.HybsSystem; 020import org.opengion.hayabusa.common.HybsSystemException; 021import org.opengion.hayabusa.queue.DBAccessQueue; 022 023/** 024 * メッセージキュー受信 メッセージキューの受信処理を行います。 025 * 026 * @og.group メッセージ連携 027 * 028 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 029 * 030 * @version 5.0 031 * @author oota 032 * @since JDK7 033 * 034 */ 035public class Daemon_QueueReceive extends HybsTimerTask { 036 /** このプログラムのVERSION文字列を設定します。 {@value} */ 037 private static final String VERSION = "7.2.9.4 (2020/11/20)" ; 038 039 private int loopCnt ; 040 private QueueReceive queueReceive ; 041 042 private static final int LOOP_COUNTER = 24; 043 private static final char FPSC = File.pathSeparatorChar ; // 7.2.9.4 (2020/11/20) システムに依存するパス区切り文字 044 045 private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY"); 046 private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY"); 047 private final String MQ_QUEUE_TYPE; 048 private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL"); 049 private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER"); 050 051 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 052 private final String USER_ID = "CYYYYY"; 053 private final String PG_ID; 054 private final String DMN_NAME = "QueueReceiveDMN"; 055 private final DBAccessQueue dbAccessQueue; 056 057 private final String REAL_PATH = HybsSystem.sys("REAL_PATH"); // 7.2.9.4 (2020/11/20) 058 059 /** 060 * コンストラクター 061 * 初期処理を行います。 062 * 063 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 064 */ 065 public Daemon_QueueReceive() { 066 super(); 067 068 // パラメータの設定 069 // 7.2.9.4 (2020/11/20) PMD:Avoid if (x != y) ..; else ..; 070 if(StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 071 throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 072 }else { 073// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 074 MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 075 PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 076 } 077 078 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 079 080// // パラメータの設定 081// if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 082//// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 083// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 084// PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 085// }else { 086// throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 087// } 088// 089// dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 090 } 091 092 /** 093 * 初期処理 MQサーバに接続します。 094 * 095 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 096 */ 097 @Override 098 public void initDaemon() { 099 // 開始ログO 100 final StringBuilder errMsg = new StringBuilder(); 101 if (MQ_QUEUE_TYPE == null) { 102 errMsg.append("MQ_QUEUE_TYPE"); 103 } 104 if (MQ_QURUE_SERVER_URL == null) { 105 errMsg.append(" MQ_QUEUE_SERVER_URL"); 106 } 107 108 if (errMsg.length() > 0) { 109 errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。"); 110 throw new HybsSystemException(errMsg.toString()); 111 } 112 113// final String queueType = MQ_QUEUE_TYPE.toUpperCase(); 114 final String queueType = MQ_QUEUE_TYPE.toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 115 116 // 開始ログ 117 System.out.println("MQキュータイプ:" + queueType); 118 System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL); 119 120 queueReceive = QueueReceiveFactory.newQueueReceive(queueType); 121 122 queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 123 } 124 125 /** 126 * 開始処理 タイマータスクのデーモン処理の開始ポイントです。 127 */ 128 @Override 129 protected void startDaemon() { 130 if (loopCnt % LOOP_COUNTER == 0) { 131 loopCnt = 1; 132 System.out.println(); 133 System.out.print(toString() + " " + new Date() + " "); 134 } else { 135 // 対象 キュー名(グループ名)とbizlogic名の取得処理 136 final String[][] ge67vals = dbAccessQueue.setlectGE67(); 137 // キュー情報登録チェック 138 if (ge67vals.length == 0) { 139 final String errMsg = "GE67にキュー情報が登録されていません。"; 140 throw new RuntimeException(errMsg); 141 } 142 // MQとSQSで処理を分岐 143 // MQ:指定キューIDからキューメッセージを取得 144 // SQS:キューメッセージを取得してからキューID(グループID)を取得 145 switch (MQ_QUEUE_TYPE) { 146 case "MQ": 147 processMq(ge67vals); 148 break; 149 case "SQS": 150 processSqs(ge67vals); 151 break; 152 default: 153 final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE; 154 throw new RuntimeException(errMsg); 155 } 156 157 loopCnt++; 158 } 159 } 160 161 /** 162 * MQ用の処理 163 * GE67に登録されているキューIDの、 164 * メッセージキューを取得して処理を行います。 165 * 166 * @param ge67vals GE67の配列データ 167 */ 168 private void processMq(final String[][] ge67vals) { 169 boolean listenerMode = false; 170 171 if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) { 172 listenerMode = true; 173 } 174 175 if(listenerMode) { 176 // リスナーの初期化 177 queueReceive.closeListener(); 178 } 179 180 // ge67のキューリスト分繰り返します 181 for (int row = 0; row < ge67vals.length; row++) { 182 final String queueId = ge67vals[row][0]; 183 final String bizLogicId = ge67vals[row][1]; 184 185 if(listenerMode) { 186 // リスナーを設定して、動的な受信処理(MQ専用) 187 final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId); 188 queueReceive.setListener(queueId, listener); 189 }else { 190 // 1件の受信処理 191 final QueueInfo queueInfo = queueReceive.receive(queueId); 192 if (queueInfo != null) { 193 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 194 // 1件処理を行ったら処理を終了します。 195 break; 196 } 197 } 198 } 199 } 200 201 /** 202 * SQS用の処理 203 * SQSはグループIDを指定して、キューを取得することはできず、 204 * 任意のキューを1つ取得してから、 205 * 判定処理を行います。 206 * GE67に登録されていないグループIDのキューが取得された場合は、 207 * GE68にエラーレコードを登録します。 208 * 209 * @param ge67vals GE67の配列データ 210 */ 211 private void processSqs(final String[][] ge67vals) { 212 // 下記はSQSの場合(キューを1件取得して処理) 213 final QueueInfo queueInfo = queueReceive.receive(null); 214 215 // キューが未取得の場合 216 if(queueInfo == null) { 217 return; 218 } 219 220 // 受信したキューを処理 221 final String groupId = queueInfo.getSqsFifoGroupId(); 222 Boolean existsFlg = false; 223 // valsにグループIDのレコードが存在するか検索 224 for (int row = 0; row < ge67vals.length; row++) { 225 final String queueId = ge67vals[row][0]; 226 227 if (groupId != null && groupId.equals(queueId)) { 228 // 該当レコードあり 229 final String bizLogicId = ge67vals[row][1]; 230 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 231 232 existsFlg = true; 233 break; 234 } 235 } 236 237 if (!existsFlg) { 238 // 該当groupIdの未登録エラー 239 // 処理番号生成 240 final String syoriNo = dbAccessQueue.generateSyoriNo(); 241 dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage()); 242 dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。"); 243 } 244 } 245 246 /** 247 * キャンセル処理 248 * タイマータスクのデーモン処理の終了ポイントです。 249 * 250 * @return キャンセルできれば、true 251 */ 252 @Override 253 public boolean cancel() { 254 if (queueReceive != null) { 255 queueReceive.close(); 256 } 257 258 return super.cancel(); 259 } 260 261 /** 262 * メッセージの処理 263 * 受信したメッセージをbizLogicに渡して、 264 * 処理を実行します。 265 * 266 * @param queueId キューID 267 * @param bizLogicId ビズロジックID 268 * @param msgText 受信メッセージ 269 */ 270 private void processMessage(final String queueId, final String bizLogicId, final String msgText) { 271 String syoriNo = ""; 272 try { 273 // 処理番号生成 274 syoriNo = dbAccessQueue.generateSyoriNo(); 275 276 // 管理テーブル登録 277 dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText); 278 279 // bizLogicの処理を実行 280 callActBizLogic(SYSTEM_ID, bizLogicId, msgText); 281 282 // 管理テーブル更新(完了) 283 dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END); 284 285 } catch (Throwable te) { 286 // bizLogicでのエラーはログの未出力して、処理を継続します。 287 // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。 288 String errMessage = null; 289 if (te.getCause() != null) { 290 // causeが設定されている場合のエラー情報 291 errMessage = te.getCause().getMessage(); 292 } else { 293 // causeが未設定の場合のエラー情報 294 errMessage = te.getMessage(); 295 } 296 System.out.println(errMessage); 297 try { 298 // エラーテーブルに登録 299 dbAccessQueue.updateGE68Error(syoriNo, errMessage); 300 } catch (Exception e) { 301 // ここでのエラーはスルーします。 302 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 303 } 304 } 305 } 306 307 /** 308 * bizLogic処理の呼び出し 309 * 必要なパス情報をリソースから取得して、 310 * BizUtil.actBizLogicにパス情報を渡すことで、 311 * bizLogicの処理を行います。 312 * 313 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している 314 * 315 * @param systemId システムID 316 * @param logicName ロジックファイル名 317 * @param msgText メッセージ 318 * @throws Throwable エラー情報 319 */ 320 private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable { 321 // 対象 クラスパスの生成 322 // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 323 // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 324 // bizLogicTag.javaのコードを移植 325 final String classDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_CLASS_PATH" ); // bizの下のパス 326 final String webIinf = REAL_PATH + "WEB-INF" + File.separator ; 327 328 final StringBuilder sb = new StringBuilder().append('.').append(FPSC); 329 330 final File lib = new File( webIinf + "lib"); 331 final File[] libFiles = lib.listFiles(); 332 if( libFiles != null ) { 333 // 7.2.9.4 (2020/11/20) PMD:This for loop can be replaced by a foreach loop 334 for( final File file : libFiles ) { 335 sb.append( file.getAbsolutePath() ).append(FPSC); 336 } 337// for (int i = 0; i < libFiles.length; i++) { 338// sb.append( libFiles[i].getAbsolutePath() ).append(FPSC); 339// } 340 } 341 342 // 上記で生成したクラスパスをclassPathに格納 343 final String classPath = 344 sb.append( webIinf ).append( "classes" ).append(FPSC) 345 .append( classDir ).append(FPSC) // bizの下のパス 346 .toString(); 347 348 // ソースパス情報の生成 349 final String srcDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_SRC_PATH" ); 350 final boolean isAutoCompile = HybsSystem.sysBool( "BIZLOGIC_AUTO_COMPILE" ); 351 final boolean isHotDeploy = HybsSystem.sysBool( "BIZLOGIC_HOT_DEPLOY" ); 352 353 // bizLogicに渡すパラメータ 354 final String[] keys = new String[] { "message" }; 355 final String[] vals = new String[] { msgText }; 356 357 // bizLogic処理の実行 358 BizUtil.actBizLogic( srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals ); 359 } 360 361// 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している 362// private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable { 363// // 対象 クラスパスの生成 364// // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 365// // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 366// // bizLogicTag.javaのコードを移植 367// final StringBuilder sb = new StringBuilder(); 368// sb.append('.').append(File.pathSeparatorChar); 369// final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib"); 370// final File[] libFiles = lib.listFiles(); 371// for (int i = 0; i < libFiles.length; i++) { 372// sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar); 373// } 374// sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar); 375// // bizの下のパス 376// sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar); 377// // 上記で生成したクラスパスをclassPathに格納 378// final String classPath = sb.toString(); 379// 380// // ソースパス情報の生成 381// final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH"); 382// final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH"); 383// final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE"); 384// final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY"); 385// 386// // bizLogicに渡すパラメータ 387// final String[] keys = new String[] { "message" }; 388// final String[] vals = new String[] { msgText }; 389// 390// // bizLogic処理の実行 391// BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals); 392// } 393 394 /** 395 * 受信処理リスナー用のインナークラス 396 * QueueReceiveリスナークラス リスナー用のクラスです。 397 * MQに設定することで、メッセージが受信されると、 398 * onMessageメソッドが実行されます。 399 * 400 * @og.rev 7.2.9.4 (2020/11/20) private final 追加 401 */ 402// class QueueReceiveListener implements MessageListener { 403 private final class QueueReceiveListener implements MessageListener { 404// private String queueId = ""; 405// private String bizLogicId = ""; 406 private final String queueId ; 407 private final String bizLogicId ; 408 409 /** 410 * コンストラクター 初期処理を行います。 411 * 412 * @param quId キューID 413 * @param bizId ビズロジックID 414 */ 415 public QueueReceiveListener(final String quId, final String bizId) { 416 queueId = quId; 417 bizLogicId = bizId; 418 } 419 420 /** 421 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。 422 * 423 * @param message 受信メッセージ 424 */ 425 @Override 426 public void onMessage(final Message message) { 427 // 要求番号 : ここでは使用していません。 428 final String ykno = ""; 429 430 // メッセージ受信 431 final TextMessage msg = (TextMessage) message; 432 String msgText = ""; 433 434 try { 435 // キューサーバのメッセージを取得 436 msgText = msg.getText(); 437 438 // メーッセージの受信応答を返します。 439 msg.acknowledge(); 440 441 processMessage(queueId, bizLogicId, msgText); 442 443 } catch (JMSException jmse) { 444 try { 445 // 管理テーブル更新 446 // 管理テーブル更新(エラー) 447 dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR); 448 } catch (Exception e) { 449 // ここでのエラーはスルーします。 450 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 451 } 452 453 throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage()); 454 } 455 } 456 } 457}