001package org.opengion.fukurou.queue; 002 003import java.util.ArrayList; 004import java.util.List; 005import java.util.Map; 006 007import javax.jms.MessageListener; 008 009import com.amazonaws.ClientConfiguration; 010import com.amazonaws.Protocol; 011import com.amazonaws.auth.AWSCredentials; 012import com.amazonaws.auth.AWSStaticCredentialsProvider; 013import com.amazonaws.auth.BasicAWSCredentials; 014import com.amazonaws.auth.InstanceProfileCredentialsProvider; 015import com.amazonaws.regions.Regions; 016import com.amazonaws.services.sqs.AmazonSQS; 017import com.amazonaws.services.sqs.AmazonSQSClientBuilder; 018import com.amazonaws.services.sqs.model.DeleteMessageRequest; 019import com.amazonaws.services.sqs.model.Message; 020import com.amazonaws.services.sqs.model.ReceiveMessageRequest; 021import com.amazonaws.util.StringUtils; 022 023/** 024 * SQSのメッセージ受信用クラス。 025 * 026 * SQSサーバからメッセージキューを受信用のクラスです。 027 * 028 * @og.group メッセージ連携 029 * 030 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 031 * 032 * @version 5 033 * @author oota 034 * @since JDK7 035 */ 036public class QueueReceive_SQS implements QueueReceive{ 037 private AmazonSQS client = null; 038 private ReceiveMessageRequest request = null; 039 private String sqsServer = ""; 040 private final static String GROUPID_ATTR = "MessageGroupId"; 041 042 /** 043 * 接続処理 044 * メッセージキューサーバに接続します。 045 * 046 * @param jmsServer jsmサーバ 047 * @param sqsAccessKey sqs用awsアクセスキー 048 * @param sqsSecretKey sqs用awsシークレットキー 049 */ 050 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 051 // jmsServerをsqsServerとして保持 052 sqsServer = jmsServer; 053 054 if (StringUtils.isNullOrEmpty(sqsAccessKey)) { 055 // IAMロールによる認証 056 client = AmazonSQSClientBuilder.standard() 057 .withCredentials(new InstanceProfileCredentialsProvider(false)).build(); 058 } else { 059 // アクセスキーによる 060 AWSCredentials credentials = new BasicAWSCredentials(sqsAccessKey, sqsSecretKey); 061 062 // proxy環境での検証用 063// ClientConfiguration conf = new ClientConfiguration(); 064// conf.setProtocol(Protocol.HTTPS); 065// conf.setProxyHost("mtc-px15"); 066// conf.setProxyPort(8081); 067 068 client = AmazonSQSClientBuilder.standard() 069 .withCredentials(new AWSStaticCredentialsProvider(credentials)) 070// .withClientConfiguration(conf) 071 .withRegion(Regions.AP_NORTHEAST_1.getName()) 072 .build(); 073 } 074 075 // グループIDを取得できるように、属性を設定 076 request = new ReceiveMessageRequest(jmsServer); 077 078 List<String> attributeNames = new ArrayList<String>(); 079 attributeNames.add(GROUPID_ATTR); 080 request.setAttributeNames(attributeNames); 081 } 082 083 /** 084 * 再受信処理 085 * 再受信処理を行います。 086 * 087 * @param queueName キュー名 088 */ 089 public QueueInfo receive(final String queueName) { 090 QueueInfo queueInfo = null; 091 092 // 受信処理 093 List<Message> messages = client.receiveMessage(request).getMessages(); 094 if(messages.size() > 0) { 095 final Message message = messages.get(0); 096 097 Map<String, String> attriMap = message.getAttributes(); 098 final String groupId = attriMap.get(GROUPID_ATTR); 099 100 queueInfo = new QueueInfo(); 101 queueInfo.setMessage(message.getBody()); 102 queueInfo.setSqsFifoGroupId(groupId); 103 104 // キューの削除 105 client.deleteMessage(new DeleteMessageRequest(sqsServer, message.getReceiptHandle())); 106 } 107 108 return queueInfo; 109 } 110 111 /** 112 * 終了処理 113 * APIによる通信処理のため、 114 * SQSでは終了処理は不要です。 115 * 116 */ 117 public void close() { 118 // 処理なし 119 } 120 121 122 /** 123 * SQSではリスナー処理は出来ません。 124 * 125 * @param queueName キュー名 126 * @param listener MessageListerを実装したクラス 127 */ 128 @Override 129 public void setListener(final String queueName, MessageListener listener) { 130 throw new RuntimeException("SQSではsetListenerは利用できません。"); 131 } 132 133 /** 134 * SQSではリスナー処理は出来ません。 135 */ 136 @Override 137 public void closeListener() { 138 throw new RuntimeException("SQSではcloseListenerは利用できません。"); 139 } 140 141 /** 142 * 検証用メソッド 143 * 検証用のmainメソッドです。 144 * 145 * 下記の値を設定してください。 146 * jmsServer: SQSのURL 147 * sqsAccessKey: SQSの権限を持ったAWSユーザのアクセスキー 148 * sqsSecretKey: 上記と同様のシークレットキー。 149 * 150 * @param args 引数 151 */ 152 public static void main(String[] args) { 153 QueueReceive_SQS queueReceive_SQS = new QueueReceive_SQS(); 154 final String jmsServer = System.getProperty("jmsServer"); 155 final String sqsAccessKey = System.getProperty("sqsAccessKey"); 156 final String sqsSecretKey = System.getProperty("sqsSecretKey"); 157 queueReceive_SQS.connect(jmsServer, sqsAccessKey, sqsSecretKey); 158 159 // 受信処理 160 // SQSはgropuIdの指定は出来ません. 161 final QueueInfo queueInfo = queueReceive_SQS.receive(null); 162 163 if(queueInfo != null) { 164 System.out.println("groupId:" + queueInfo.getSqsFifoGroupId()); 165 System.out.println("message:" + queueInfo.getMessage()); 166 }else { 167 System.out.println("キューが存在しません"); 168 } 169 } 170 171 /** 172 * バッチ処理判定フラグを設定します。 173 * 174 * @param batchFlg バッチ処理判定フラグ 175 */ 176 public void setBatchFlg(final Boolean batchFlg) { 177 // SQSはバッチでも同じ処理になります。 178 } 179}