001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 017package org.opengion.fukurou.queue; 018 019import javax.jms.Connection; 020import javax.jms.JMSException; 021import javax.jms.MessageProducer; 022import javax.jms.Queue; 023import javax.jms.QueueConnectionFactory; 024import javax.jms.QueueSession; 025import javax.jms.Session; 026import javax.jms.TextMessage; 027import javax.naming.Context; 028import javax.naming.InitialContext; 029import javax.naming.NamingException; 030 031import org.apache.activemq.ActiveMQConnection; 032import org.apache.activemq.ActiveMQConnectionFactory; 033 034/** 035 * MQサーバへのメッセージキュー送信用クラス 036 * 037 * MQサーバへのメッセージキュー送信用のクラスです。 038 * Apache ActiveMQとAmazonMQへの送信が可能です。 039 * tomcatからの送信(JNDI利用)と、 040 * バッチ処理(urlを指定し接続)の2通りが可能です。 041 * 042 * ※Apache ActiveMQとAmazonMQの切り替えは、 043 * jmsServerの接続先URLを変更するのみで接続の変更が可能です。 044 * (proxy環境からAmazonMqへの接続は行えない場合があります) 045 * 046 * @og.group メッセージ連携 047 * 048 * @og.rev 5.10.14.0 (2019/08/01) 新規作成 049 * 050 * @version 5 051 * @author oota 052 * @since JDK7 053 * 054 */ 055public class QueueSend_MQ implements QueueSend { 056 private Connection connection = null; 057 private Session session = null; 058 private MessageProducer sender = null; 059 private Context ctx = null; 060 // バッチ用フィールド 061 private boolean batch = false; 062 private String mqUserId = ""; 063 private String mqPassword = ""; 064 065 /** 066 * 接続処理 067 * MQサーバに接続を行います。 068 * 069 * @param jmsServer jmsサーバ接続名(バッチの場合はurl) 070 */ 071 public void connect(final String jmsServer) { 072 try { 073 ctx = new InitialContext(); 074 // 1. Connectionの作成s 075 QueueConnectionFactory factory = null; 076 if (batch) { 077 // バッチ処理の場合。URL指定で、ユーザIDとパスワードを指定して接続。 078 mqUserId = System.getProperty("mqUserId"); 079 mqPassword = System.getProperty("mqPassword"); 080 factory = new ActiveMQConnectionFactory(jmsServer); 081 connection = (ActiveMQConnection)factory.createConnection(mqUserId, mqPassword); 082 } else { 083 // tomcat接続の場合。JNDIを利用して接続。 084 factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/" + jmsServer); 085 connection = (ActiveMQConnection)factory.createConnection(); 086 } 087 088 // 2. Connectioの開始 089 connection.start(); 090 091 } catch (JMSException jmse) { 092 throwErrMsg("MQサーバーの接続に失敗しました。" + jmse.getMessage()); 093 } catch (NamingException ne) { 094 throwErrMsg("名前解決に失敗しました。" + ne.getMessage()); 095 } 096 } 097 098 /** 099 * 接続処理 100 * MQサーバに接続します。 101 * connect(String jmsServer)と同じ処理になります。 102 * 103 * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応 104 * 105 * @param jmsServer jmsサーバ情報 106 * @param sqsAccessKey アクセスキー(MQサーバでは未使用) 107 * @param sqsSecretKey シークレットキー(MQサーバでは未使用) 108 */ 109 @Override 110 public void connect(String jmsServer, String sqsAccessKey, String sqsSecretKey) { 111 // MQではsqsAccessKeyとsqsSecretKeyは利用しません。 112 connect(jmsServer); 113 } 114 115 116 /** 117 * エラーメッセージ送信 118 * 119 * @og.rev 5.10.15.0 (2019/08/30) Hybs除外 120 * 121 * @param errMsg エラーメッセージ 122 */ 123 public void throwErrMsg(final String errMsg) { 124 throw new RuntimeException( errMsg ); 125 } 126 127 /** 128 * メッセージ送信 129 * MQサーバにメッセージを送信します。 130 * 131 * @param queueInfo 送信キュー情報 132 */ 133 @Override 134 public void sendMessage(final QueueInfo queueInfo) { 135 try { 136 // 初期チェック 137 if (connection == null) { 138 throwErrMsg("MQサーバに接続されていません。"); 139 } 140 141 // 1. QueueSessionの作成 142 session = connection.createSession(queueInfo.isMqTransacted(), queueInfo.getMqAcknowledgeMode()); 143 if (session == null) { 144 throwErrMsg("キューセッションの生成に失敗しました。"); 145 } 146 147 // 2. Queueの作成 148 Queue queue = null; 149 queue = session.createQueue(queueInfo.getMqQueueName()); 150 sender = session.createProducer(queue); 151 152 // 3. テキストメッセージの作成 153 TextMessage msg = session.createTextMessage(queueInfo.getMessage()); 154 155 // 4. 送信処理 156 sender.send(msg); 157 158 } catch (JMSException e) { 159 throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage()); 160 } 161 } 162 163 /** 164 * クローズ処理 165 * MQサーバとの接続をクローズします。 166 */ 167 @Override 168 public void close() { 169 if (ctx != null) { 170 try { 171 ctx.close(); 172 } catch (Exception e) { 173 System.out.println("ctxのクローズに失敗しました。"); 174 } 175 } 176 // 1. sender,session,connectionのクローズ処理 177 if (sender != null) { 178 try { 179 sender.close(); 180 } catch (Exception e) { 181 System.out.println("senderのクローズに失敗しました。"); 182 } 183 } 184 if (session != null) { 185 try { 186 session.close(); 187 } catch (Exception e) { 188 System.out.println("sessionのクローズに失敗しました。"); 189 } 190 } 191 if (connection != null) { 192 try { 193 connection.close(); 194 } catch (Exception e) { 195 System.out.println("connectionのクローズに失敗しました。"); 196 } 197 } 198 } 199 200 /** 201 * バッチ処理判定フラグを設定します。 202 * バッチ処理の場合は引数で接続先情報を与えます。 203 * それ以外の場合(Tomcat)ではJNDIより情報を取得します。 204 * 205 * @param batchFlg バッチ処理判定フラグ 206 */ 207 @Override 208 public void setBatchFlg(final Boolean batchFlg) { 209 batch = batchFlg; 210 } 211 212 /** 213 * テスト用メソッド 214 * テスト実行用です。 215 * 216 * @param args 引数 217 */ 218 public static void main(String[] args) { 219 System.out.println("main start"); 220 // 送信情報の設定 221 String url = "tcp://localhost:61616"; 222 String queueName = "test01"; 223 String msg = "送信メッセージ"; 224 225 QueueInfo queueInfo = new QueueInfo(); 226 queueInfo.setMqQueueName(queueName); 227 queueInfo.setMqTransacted(false); 228 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 229 queueInfo.setMessage(msg); 230 231 QueueSend queueSend = new QueueSend_MQ(); 232 queueSend.setBatchFlg(true); 233 234 try { 235 queueSend.connect(url,null,null); 236 queueSend.sendMessage(queueInfo); 237 } catch (Exception e) { 238 System.out.println(e.getMessage()); 239 } finally { 240 queueSend.close(); 241 } 242 243 System.out.println("main end"); 244 } 245}