001package org.opengion.fukurou.queue;
002
003import java.util.ArrayList;
004import java.util.List;
005
006import javax.jms.JMSException;
007import javax.jms.Message;
008import javax.jms.MessageListener;
009import javax.jms.Queue;
010import javax.jms.QueueConnection;
011import javax.jms.QueueConnectionFactory;
012import javax.jms.QueueReceiver;
013import javax.jms.QueueSession;
014import javax.jms.TextMessage;
015import javax.naming.Context;
016import javax.naming.InitialContext;
017
018import org.apache.activemq.ActiveMQConnectionFactory;
019
020/**
021 * MQメッセージ受信用クラス。
022 *
023 * @og.group メッセージ連携
024 *
025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
026 * 
027 * @version 5
028 * @author oota
029 * @since JDK7
030 */
031public class QueueReceive_MQ implements QueueReceive{
032
033        private QueueConnection connection = null;
034        private QueueSession session = null;
035        private QueueReceiver receiver = null;
036        List<QueueReceiver> listReceiver = null;
037        private boolean batch = false;
038        
039        /**
040         * 接続処理
041         * メッセージキューサーバに接続します。
042         * 
043         *  @param jmsServer jsmサーバ
044         *  @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません)
045         *  @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません)
046         */
047        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
048                connect(jmsServer);
049        }
050        
051        /**
052         * 接続処理
053         * jmsServerに接続します。
054         * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。
055         * SQSの場合は最大受信件数の10件の処理を行います。
056         * 
057         * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL
058         */
059        private void connect(final String jmsServer) {  
060                try {
061                        if(batch) {
062                                // バッチ用
063                                String mqUserId = System.getProperty("mqUserId");
064                                String mqPassword = System.getProperty("mqPassword");
065                                QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer);              
066                                connection = factory.createQueueConnection(mqUserId,  mqPassword);
067                        }else {
068                                // jndi接続用
069                                Context ctx = new InitialContext();
070                                QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer);
071                                connection = factory.createQueueConnection();
072                        }
073                        
074                        connection.start();
075                        
076                        // Receiveの作成
077                        session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
078                        
079                        // 初期化
080                        listReceiver = new ArrayList<QueueReceiver>();
081                }catch(Exception e) {
082                        throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage());
083                }
084        }
085
086        /**
087         * 受信処理
088         * メッセージキューの受信の処理を行います。
089         * 
090         */
091        @Override
092        public QueueInfo receive(final String queueName) {
093                QueueInfo queueInfo = null;
094                
095                try {
096                        Queue queue = session.createQueue(queueName);
097                        receiver = session.createReceiver(queue);
098                        
099                        TextMessage msg = (TextMessage)receiver.receive(1000);
100                        
101                        if(msg != null) {
102                                // メッセージ受信の確認応答
103                                msg.acknowledge();
104                                
105                                // メッセージの設定
106                                queueInfo = new QueueInfo();
107                                queueInfo.setMessage(msg.getText());
108                        }
109                }catch(Exception e) {
110                        throw new RuntimeException(e.getMessage());
111                }finally {
112                        try {
113                                receiver.close();
114                        }catch(Exception e) {}
115                }
116                
117                return queueInfo;
118        }
119        
120        /**
121         * リスナーの起動
122         * 指定したキュー名に対して、
123         * MessageListenerのリスナーを設定します。
124         * 
125         * @param queueName キュー名
126         * @param listener MessageListerを実装したクラス
127         */
128        @Override
129        public void setListener(final String queueName, MessageListener listener) {
130                QueueReceiver receiver = null;
131                try {   
132                        Queue queue = session.createQueue(queueName);
133                        receiver = session.createReceiver(queue);
134                        receiver.setMessageListener(listener);
135                        
136                        // リスナーの起動
137                        listReceiver.add(receiver);
138                }catch(JMSException e) {
139                        throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage());
140                }
141        }
142
143        /**
144         * クローズリスナー
145         * レシーバーをクローズすることで、
146         * リスナーの処理を終了します。
147         */
148        public void closeListener() {
149                for(QueueReceiver receiver: listReceiver) {
150                        try {
151                                receiver.close();
152                        }catch(Exception e) {
153                                
154                        }
155                }
156                
157                // 初期化
158                listReceiver = null;
159                listReceiver = new ArrayList<QueueReceiver>(); 
160        }
161        
162        /**
163         * クローズ処理
164         * クローズ処理を行います。
165         */
166        @Override
167        public void close() {
168                if(receiver != null) {
169                        try {
170                                receiver.close();
171                        }catch(Exception e) {
172                                
173                        }
174                }
175                if(session != null) {
176                        try {
177                                session.close();
178                        }catch(Exception e) {
179                                
180                        }
181                }
182                if(connection != null) {
183                        try {
184                                connection.close();
185                        }catch(Exception e) {
186                                
187                        }
188                }
189        }
190
191        /**
192         * バッチ処理判定フラグを設定します。
193         * 
194         * @param batchFlg バッチ処理判定フラグ
195         */
196        public void setBatchFlg(final Boolean batchFlg) {
197                batch = batchFlg;
198        }
199        
200        /**
201         * 検証用メソッド
202         * テスト用のメソッドです。
203         * 
204         * @param args 引数
205         */
206        public static void main(String[] args) {
207                QueueReceive receive = new QueueReceive_MQ();
208                final String jmsServer = "tcp://localhost:61616";
209                
210                // バッチフラグにtrueを設定
211                // 未設定の場合は、tomcatのjndi接続処理が実行されます。
212                receive.setBatchFlg(true);
213                
214                // 認証情報の設定
215                System.setProperty("mqUserId", "admin");
216                System.setProperty("mqPassword", "admin");
217                
218                // 接続
219                receive.connect(jmsServer, null, null);
220                
221                // 処理対象のキュー名
222                String queueName = "queue01";
223                
224                
225                // ** 1件受信する場合
226                QueueInfo queueInfo = receive.receive(queueName);
227                if(queueInfo != null) {
228                        System.out.println("message:" + queueInfo.getMessage());        
229                }else {
230                        System.out.println("キューが登録されていません。");
231                }
232                
233//              // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ)
234//              // MessageListerを実装した、QueueReceiveListenerクラスを作成します。
235//              MessageListener listener = new QueueReceiveListener();
236//              receive.setListener(queueName, listener);
237//              // 複数のキューにリスナーを設定することも可能です。
238//              receive.setListener("queue02", listener);
239//              
240//              try {
241//                      // 1分間リスナーを起動しておく場合の、プロセス待機処理
242//                      Thread.sleep(60 * 1000);
243//              }catch(InterruptedException e) {
244//                      throw new RuntimeException(e.getMessage());
245//              }
246                
247                // リスナー利用時は、closeListenerを実行して、解放してください。
248                receive.closeListener();
249                
250                // 終了処理
251                receive.close();
252        }
253        
254        /**
255         * QueueReceiveリスナークラス
256         * リスナー用のクラスです。
257         * MQに設定することで、メッセージが受信されると、
258         * 自動的にonMessageメソッドが実行されます。
259         *
260         */
261        static class QueueReceiveListener implements MessageListener {
262                /**
263                 * メッセージ受信処理
264                 * MQサーバにメッセージが受信されると、
265                 * メソッドの処理が行われます。
266                 * 
267                 * @param Message 受信メッセージ
268                 */
269                @Override
270                public void onMessage(final Message message) {
271
272                        // メッセージ受信
273                        TextMessage msg = (TextMessage) message;
274                        String msgText = "";
275
276                        try {
277                                // キューサーバのメッセージを取得
278                                msgText = msg.getText();
279                                // メーッセージの受信応答を返します。
280                                msg.acknowledge();
281                                
282                                System.out.println("message:" + msgText);
283
284                        } catch (JMSException e) {
285                                throw new RuntimeException(e.getMessage());
286                        }
287                }
288        }
289
290}