001package org.opengion.fukurou.queue;
002
003import java.util.ArrayList;
004import java.util.List;
005import java.util.Map;
006
007import javax.jms.MessageListener;
008
009import com.amazonaws.ClientConfiguration;
010import com.amazonaws.Protocol;
011import com.amazonaws.auth.AWSCredentials;
012import com.amazonaws.auth.AWSStaticCredentialsProvider;
013import com.amazonaws.auth.BasicAWSCredentials;
014import com.amazonaws.auth.InstanceProfileCredentialsProvider;
015import com.amazonaws.regions.Regions;
016import com.amazonaws.services.sqs.AmazonSQS;
017import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
018import com.amazonaws.services.sqs.model.DeleteMessageRequest;
019import com.amazonaws.services.sqs.model.Message;
020import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
021import com.amazonaws.util.StringUtils;
022
023/**
024 * SQSのメッセージ受信用クラス。
025 * 
026 * SQSサーバからメッセージキューを受信用のクラスです。
027 * 
028 * @og.group メッセージ連携
029 *
030 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
031 * 
032 * @version 5
033 * @author oota
034 * @since JDK7
035 */
036public class QueueReceive_SQS implements QueueReceive{
037        private AmazonSQS client = null;
038        private ReceiveMessageRequest request = null;
039        private String sqsServer = "";
040        private final static String GROUPID_ATTR = "MessageGroupId";
041        
042        /**
043         * 接続処理
044         * メッセージキューサーバに接続します。
045         * 
046         *  @param jmsServer jsmサーバ
047         *  @param sqsAccessKey sqs用awsアクセスキー
048         *  @param sqsSecretKey sqs用awsシークレットキー
049         */
050        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
051                // jmsServerをsqsServerとして保持
052                sqsServer = jmsServer;
053                
054                if (StringUtils.isNullOrEmpty(sqsAccessKey)) {
055                        // IAMロールによる認証
056                        client = AmazonSQSClientBuilder.standard()
057                                        .withCredentials(new InstanceProfileCredentialsProvider(false)).build();
058                } else {
059                        // アクセスキーによる
060                        AWSCredentials credentials = new BasicAWSCredentials(sqsAccessKey, sqsSecretKey);
061                        
062        // proxy環境での検証用
063//                      ClientConfiguration conf = new ClientConfiguration();
064//                      conf.setProtocol(Protocol.HTTPS);
065//                      conf.setProxyHost("mtc-px15");
066//                      conf.setProxyPort(8081);
067                        
068                        client = AmazonSQSClientBuilder.standard()
069                                        .withCredentials(new AWSStaticCredentialsProvider(credentials))
070//                                      .withClientConfiguration(conf)
071                                        .withRegion(Regions.AP_NORTHEAST_1.getName())
072                                        .build();
073                }
074                
075                // グループIDを取得できるように、属性を設定
076                request = new ReceiveMessageRequest(jmsServer);
077                
078                List<String> attributeNames = new ArrayList<String>();
079                attributeNames.add(GROUPID_ATTR);
080                request.setAttributeNames(attributeNames);
081        }
082        
083        /**
084         * 再受信処理
085         * 再受信処理を行います。
086         * 
087         * @param queueName キュー名
088         */
089        public QueueInfo receive(final String queueName) {
090                QueueInfo queueInfo = null;
091                
092                // 受信処理
093                List<Message> messages = client.receiveMessage(request).getMessages();
094                if(messages.size() > 0) {
095                        final Message message = messages.get(0);
096                        
097                        Map<String, String> attriMap = message.getAttributes();
098                        final String groupId = attriMap.get(GROUPID_ATTR);
099                        
100                        queueInfo = new QueueInfo();
101                        queueInfo.setMessage(message.getBody());
102                        queueInfo.setSqsFifoGroupId(groupId);
103                        
104                        // キューの削除
105                        client.deleteMessage(new DeleteMessageRequest(sqsServer, message.getReceiptHandle()));
106                }
107                
108                return queueInfo;
109        }
110        
111        /**
112         * 終了処理
113         * APIによる通信処理のため、
114         * SQSでは終了処理は不要です。
115         * 
116         */
117        public void close() {
118                // 処理なし
119        }
120        
121
122        /**
123         * SQSではリスナー処理は出来ません。
124         * 
125         * @param queueName キュー名
126         * @param listener MessageListerを実装したクラス
127         */
128        @Override
129        public void setListener(final String queueName, MessageListener listener) {
130                throw new RuntimeException("SQSではsetListenerは利用できません。");
131        }
132
133        /**
134         * SQSではリスナー処理は出来ません。
135         */
136        @Override
137        public void closeListener() {
138                throw new RuntimeException("SQSではcloseListenerは利用できません。");
139        }
140        
141        /**
142         * 検証用メソッド
143         * 検証用のmainメソッドです。
144         * 
145         * 下記の値を設定してください。
146         * jmsServer: SQSのURL
147         * sqsAccessKey: SQSの権限を持ったAWSユーザのアクセスキー
148         * sqsSecretKey: 上記と同様のシークレットキー。
149         * 
150         * @param args 引数
151         */
152        public static void main(String[] args) {
153                QueueReceive_SQS queueReceive_SQS = new QueueReceive_SQS();
154                final String jmsServer = System.getProperty("jmsServer");
155                final String sqsAccessKey = System.getProperty("sqsAccessKey");
156                final String sqsSecretKey = System.getProperty("sqsSecretKey");
157                queueReceive_SQS.connect(jmsServer, sqsAccessKey, sqsSecretKey);
158                
159                // 受信処理
160                // SQSはgropuIdの指定は出来ません.
161                final QueueInfo queueInfo = queueReceive_SQS.receive(null);
162                
163                if(queueInfo != null) {
164                        System.out.println("groupId:" + queueInfo.getSqsFifoGroupId());
165                        System.out.println("message:" + queueInfo.getMessage());
166                }else {
167                        System.out.println("キューが存在しません");
168                }
169        }
170        
171        /**
172         * バッチ処理判定フラグを設定します。
173         * 
174         * @param batchFlg バッチ処理判定フラグ
175         */
176        public void setBatchFlg(final Boolean batchFlg) {
177                // SQSはバッチでも同じ処理になります。
178        }
179}