001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 017package org.opengion.fukurou.queue; 018 019import javax.jms.Connection; 020import javax.jms.JMSException; 021import javax.jms.MessageProducer; 022import javax.jms.Queue; 023import javax.jms.QueueConnectionFactory; 024import javax.jms.QueueSession; 025import javax.jms.Session; 026import javax.jms.TextMessage; 027import javax.naming.Context; 028import javax.naming.InitialContext; 029import javax.naming.NamingException; 030 031import org.apache.activemq.ActiveMQConnection; 032import org.apache.activemq.ActiveMQConnectionFactory; 033// import org.opengion.hayabusa.common.HybsSystemException; 034 035// import com.sun.star.uno.RuntimeException; 036 037/** 038 * MQサーバへのメッセージキュー送信用クラス 039 * 040 * MQサーバへのメッセージキュー送信用のクラスです。 041 * Apache ActiveMQとAmazonMQへの送信が可能です。 042 * tomcatからの送信(JNDI利用)と、 043 * バッチ処理(urlを指定し接続)の2通りが可能です。 044 * 045 * ※Apache ActiveMQとAmazonMQの切り替えは、 046 * jmsServerの接続先URLを変更するのみで接続の変更が可能です。 047 * (proxy環境からAmazonMqへの接続は行えない場合があります) 048 * 049 * @og.group メッセージ連携 050 * 051 * @og.rev 5.10.14.0 (2019/08/01) 新規作成 052 * 053 * @version 5 054 * @author oota 055 * @since JDK7 056 * 057 */ 058public class QueueSend_MQ implements QueueSend { 059 private Connection connection; 060 private Session session; 061 private MessageProducer sender; 062 private Context ctx; 063 // バッチ用フィールド 064 private boolean batch; 065 private String mqUserId = ""; 066 private String mqPassword = ""; 067 068 /** 069 * 接続処理 070 * MQサーバに接続を行います。 071 * 072 * @param jmsServer jmsサーバ接続名(バッチの場合はurl) 073 */ 074 public void connect(final String jmsServer) { 075 try { 076 ctx = new InitialContext(); 077 // 1. Connectionの作成s 078// QueueConnectionFactory factory = null; 079 final QueueConnectionFactory factory; 080 if (batch) { 081 // バッチ処理の場合。URL指定で、ユーザIDとパスワードを指定して接続。 082 mqUserId = System.getProperty("mqUserId"); 083 mqPassword = System.getProperty("mqPassword"); 084 factory = new ActiveMQConnectionFactory(jmsServer); 085 connection = (ActiveMQConnection)factory.createConnection(mqUserId, mqPassword); 086 } else { 087 // tomcat接続の場合。JNDIを利用して接続。 088 factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/" + jmsServer); 089 connection = (ActiveMQConnection)factory.createConnection(); 090 } 091 092 // 2. Connectioの開始 093 connection.start(); 094 095 } catch (final JMSException jmse) { 096 throwErrMsg("MQサーバーの接続に失敗しました。" + jmse.getMessage()); 097 } catch (final NamingException ne) { 098 throwErrMsg("名前解決に失敗しました。" + ne.getMessage()); 099 } 100 } 101 102 /** 103 * 接続処理 104 * MQサーバに接続します。 105 * connect(String jmsServer)と同じ処理になります。 106 * 107 * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応 108 * 109 * @param jmsServer jmsサーバ情報 110 * @param sqsAccessKey アクセスキー(MQサーバでは未使用) 111 * @param sqsSecretKey シークレットキー(MQサーバでは未使用) 112 */ 113 @Override 114 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 115 // MQではsqsAccessKeyとsqsSecretKeyは利用しません。 116 connect(jmsServer); 117 } 118 119 /** 120 * エラーメッセージ送信。 121 * 122 * @og.rev 5.10.15.0 (2019/08/30) Hybs除外 123 * 124 * @param errMsg エラーメッセージ 125 */ 126 public void throwErrMsg(final String errMsg) { 127 throw new RuntimeException( errMsg ); 128// if (batch) { 129// // バッチ用エラー 130// throw new RuntimeException(errMsg); 131// } else { 132// // 画面用エラー 133// throw new HybsSystemException(errMsg); 134// } 135 } 136 137 /** 138 * メッセージ送信 139 * MQサーバにメッセージを送信します。 140 * 141 * @param queueInfo 送信キュー情報 142 */ 143 @Override 144 public void sendMessage(final QueueInfo queueInfo) { 145 try { 146 // 初期チェック 147 if (connection == null) { 148 throwErrMsg("MQサーバに接続されていません。"); 149 } 150 151 // 1. QueueSessionの作成 152 session = connection.createSession(queueInfo.isMqTransacted(), queueInfo.getMqAcknowledgeMode()); 153 if (session == null) { 154 throwErrMsg("キューセッションの生成に失敗しました。"); 155 } 156 157 // 2. Queueの作成 158// Queue queue = null; 159// queue = session.createQueue(queueInfo.getMqQueueName()); 160 final Queue queue = session.createQueue(queueInfo.getMqQueueName()); 161 sender = session.createProducer(queue); 162 163 // 3. テキストメッセージの作成 164 final TextMessage msg = session.createTextMessage(queueInfo.getMessage()); 165 166 // 4. 送信処理 167 sender.send(msg); 168 169 } catch (JMSException e) { 170 throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage()); 171 } 172 } 173 174 /** 175 * クローズ処理 176 * MQサーバとの接続をクローズします。 177 */ 178 @Override 179 public void close() { 180 if (ctx != null) { 181 try { 182 ctx.close(); 183 } catch (Exception e) { 184 System.out.println("ctxのクローズに失敗しました。"); 185 } 186 } 187 // 1. sender,session,connectionのクローズ処理 188 if (sender != null) { 189 try { 190 sender.close(); 191 } catch (Exception e) { 192 System.out.println("senderのクローズに失敗しました。"); 193 } 194 } 195 if (session != null) { 196 try { 197 session.close(); 198 } catch (Exception e) { 199 System.out.println("sessionのクローズに失敗しました。"); 200 } 201 } 202 if (connection != null) { 203 try { 204 connection.close(); 205 } catch (Exception e) { 206 System.out.println("connectionのクローズに失敗しました。"); 207 } 208 } 209 } 210 211 /** 212 * バッチ処理判定フラグを設定します。 213 * バッチ処理の場合は引数で接続先情報を与えます。 214 * それ以外の場合(Tomcat)ではJNDIより情報を取得します。 215 * 216 * @param batchFlg バッチ処理判定フラグ 217 */ 218 @Override 219 public void setBatchFlg(final Boolean batchFlg) { 220 batch = batchFlg; 221 } 222 223 /** 224 * テスト用メソッド 225 * テスト実行用です。 226 * 227 * @param args 引数 228 */ 229 public static void main(final String[] args) { 230 System.out.println("main start"); 231 // 送信情報の設定 232 final String url = "tcp://localhost:61616"; 233 final String queueName = "test01"; 234 final String msg = "送信メッセージ"; 235 236 final QueueInfo queueInfo = new QueueInfo(); 237 queueInfo.setMqQueueName(queueName); 238 queueInfo.setMqTransacted(false); 239 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 240 queueInfo.setMessage(msg); 241 242 final QueueSend queueSend = new QueueSend_MQ(); 243 queueSend.setBatchFlg(true); 244 245 try { 246 queueSend.connect(url,null,null); 247// queueSend.connect(url); 248 queueSend.sendMessage(queueInfo); 249 } catch (final Exception e) { 250 System.out.println(e.getMessage()); 251 } finally { 252 queueSend.close(); 253 } 254 255 System.out.println("main end"); 256 } 257}