001package org.opengion.plugin.daemon; 002 003import java.util.Date; 004 005import javax.jms.QueueSession; 006 007import org.opengion.fukurou.util.StringUtil; 008import org.opengion.fukurou.queue.QueueInfo; 009import org.opengion.fukurou.queue.QueueSend; 010import org.opengion.fukurou.queue.QueueSendFactory; 011import org.opengion.fukurou.util.HybsTimerTask; 012import org.opengion.hayabusa.common.HybsSystem; 013import org.opengion.hayabusa.queue.DBAccessQueue; 014 015/** 016 * メッセージキュー送信 017 * メッセージキュー送信テーブルを監視して、 018 * 送信処理を行います。 019 * 020 * @og.group メッセージ連携 021 * 022 * @og.rev 5.10.15.0 (2019/08/30) 新規作成 023 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動 024 * 025 * @version 5.0 026 * @author oota 027 * @since JDK7 028 * 029 */ 030public class Daemon_QueueSend extends HybsTimerTask { 031 private int loopCnt = 0; 032 private static final int LOOP_COUNTER = 24; 033 private QueueSend queueSend; 034 035 private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 036 private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" ); 037 private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" ); 038 private final String USER_ID = "CYYYYY"; 039 private final String PG_ID = "DMN_QueSnd"; 040 private final String DMN_NAME = "QueueReceiveDMN"; 041 private final DBAccessQueue dbAccessQueue; 042 043 /** 044 * コンストラクター 045 * 初期処理を行います。 046 */ 047 public Daemon_QueueSend(){ 048 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 049 } 050 /** 051 * 開始処理 052 * 053 * 054 * タイマータスクのデーモン処理の開始ポイントです。 055 * 056 * @og.rev 5.10.16.1 (2019/10/11) StringUtil変更 057 * 058 */ 059 @Override 060 protected void startDaemon() { 061 if (loopCnt % LOOP_COUNTER == 0) { 062 loopCnt = 1; 063 System.out.println(); 064 System.out.println(toString() + " " + new Date() + ""); 065 } else { 066 // メッセージキュー送信管理テーブルから、送信対象のレコードを取得 067 String[][] vals = dbAccessQueue.selectGE65(); 068 069 // 取得データ分の繰り返し処理を実行する 070 for(int i = 0; i < vals.length; i++) { 071 String[] record = vals[i]; 072 073 // GE65から取得した値を変数に格納 074 String ykno = record[0]; 075 String queueId = record[1]; 076 String message = record[2]; 077 String dedupliId = record[3]; 078 String queSyu = record[4]; 079 String jmsUrl = record[5]; 080 081 String queueType = queSyu.toUpperCase(); 082 queueSend = QueueSendFactory.newQueueSend(queueType); 083 084 // 接続処理 085 queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 086 087 // メッセージ送信管理テーブルから取得したデータを送信実装予定 088 QueueInfo queueInfo = new QueueInfo(); 089 090 // 応答確認種別 091 if("MQ".equals(queueType)){ 092 // MQメッセージサーバ指定時 093 queueInfo.setMqTransacted(false); 094 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 095 // キュー名 096 queueInfo.setMqQueueName(queueId); 097 }else if("SQS".equals(queueType)){ 098 // SQSメッセージサーバ指定時 099 // グループID 100 queueInfo.setSqsFifoGroupId(queueId); 101// if(!StringUtil.isEmpty(dedupliId)) { 102 if(!StringUtil.isNull(dedupliId)) { 103 // 重複排除ID 104 // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる) 105 queueInfo.setSqsFifoDedupliId(dedupliId); 106 } 107 } 108 109 // メッセージ 110 queueInfo.setMessage(message); 111 112 // 完了フラグを処理中:2に更新 113 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS); 114 115 // メッセージ送信処理 116 try{ 117 queueSend.sendMessage(queueInfo); 118 119 // 完了フラグを完了:3に更新 120 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END); 121 122 }catch(Exception e) { 123 // 完了フラグをエラー:4に更新して、エラー情報を登録 124 dbAccessQueue.updateGE66Error(ykno, e.getMessage()); 125 } 126 } 127 128 // クローズ処理 129 queueSend.close(); 130 131 loopCnt++; 132 } 133 } 134}