001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 017package org.opengion.fukurou.queue; 018 019import org.opengion.fukurou.util.StringUtil; 020 021import com.amazonaws.auth.AWSCredentials; 022import com.amazonaws.auth.AWSStaticCredentialsProvider; 023import com.amazonaws.auth.BasicAWSCredentials; 024import com.amazonaws.auth.InstanceProfileCredentialsProvider; 025import com.amazonaws.regions.Regions; 026import com.amazonaws.services.sqs.AmazonSQS; 027import com.amazonaws.services.sqs.AmazonSQSClientBuilder; 028import com.amazonaws.services.sqs.model.SendMessageRequest; 029 030/** 031 * SQSサーバへのメッセージキュー送信クラス 032 * 033 * SQSサーバへのメッセージキュー送信用のクラスです。 034 * AmazonSQSへの送信が可能です。 035 * 036 * @og.group メッセージ連携 037 * 038 * @og.rev 5.10.14.0 (2019/08/01) 新規作成 039 * 040 * @version 5 041 * @author oota 042 * @since JDK7 043 */ 044public class QueueSend_SQS implements QueueSend { 045 String sqsUrl = ""; 046 Boolean batch = false; 047 AmazonSQS client; 048 049 /** 050 * 接続処理 051 * SQSサーバに接続を行います。 052 * 053 * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応 054 * 055 * @param jmsServer 接続先url 056 * @param sqsAccessKey sqsアクセスキー 057 * @param sqsSecretKey sqsシークレットキー 058 */ 059 @Override 060 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 061 sqsUrl = jmsServer; 062 063 try { 064 if (StringUtil.isNull(sqsAccessKey)) { 065 // IAMロールによる認証 066 client = AmazonSQSClientBuilder.standard() 067 .withCredentials(new InstanceProfileCredentialsProvider(false)).build(); 068 } else { 069 AWSCredentials credentials = new BasicAWSCredentials(sqsAccessKey, sqsSecretKey); 070 071// proxy環境でのテスト用。proxyホストの情報を入力して、実行します。 072// ClientConfiguration conf = new ClientConfiguration(); 073// conf.setProtocol(Protocol.HTTPS); 074// conf.setProxyHost("mtc-px14"); 075// conf.setProxyPort(8081); 076 077 client = AmazonSQSClientBuilder.standard() 078 .withCredentials(new AWSStaticCredentialsProvider(credentials)) 079// .withClientConfiguration(conf) 080 .withRegion(Regions.AP_NORTHEAST_1.getName()).build(); 081 } 082 } catch (Exception e) { 083 throwErrMsg("SQSサーバの接続に失敗しました。" + e.getMessage()); 084 } 085 } 086 087 /** 088 * エラーメッセージ送信 089 * 090 * @og.rev 5.10.15.0 (2019/08/30) hybs除外 091 * 092 * @param errMsg エラーメッセージ 093 */ 094 public void throwErrMsg(final String errMsg) { 095 throw new RuntimeException( errMsg ); 096 } 097 098 /** 099 * メッセージ送信 100 * MQサーバにメッセージキューを送信します。 101 * 102 * @param info 送信メッセージキュー情報 103 */ 104 @Override 105 public void sendMessage(QueueInfo info) { 106 if(client == null) { 107 throwErrMsg("SQSサーバに接続されていません。"); 108 } 109 110 SendMessageRequest request = new SendMessageRequest(sqsUrl, info.getMessage()); 111 112 /** 情報設定 */ 113 // FIFOタイプのみ設定します。 114 // グループID 115 request.setMessageGroupId(info.getSqsFifoGroupId()); 116 // 重複禁止ID 117 request.setMessageDeduplicationId(info.getSqsFifoDedupliId()); 118 119 /** 送信処理 */ 120 try { 121 client.sendMessage(request); 122 } catch (Exception e) { 123 throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage()); 124 } 125 } 126 127 /** 128 * クローズ処理 129 * SQSサーバとの接続をクローズします。 130 */ 131 @Override 132 public void close() { 133 if(client != null) { 134 client.shutdown(); 135 } 136 } 137 138 @Override 139 public void setBatchFlg(final Boolean batchFlg) { 140 // SQSの場合は、バッチとWeb上で共通処理のため、フラグ設定の影響はありません 141 batch = batchFlg; 142 } 143 144 /** 145 * テスト用メソッド 146 * proxy環境下で実行する場合は、 147 * connectメソッドのproxy設定のコメントを外して、 148 * 実行する必要があります。 149 * 150 * @param args 引数 151 */ 152 public static void main(String[] args) { 153 // 接続先 154 String url = System.getProperty("URL"); 155 // グループID 156 String groupId = System.getProperty("GROUPID"); 157 // アクセスキー 158 String accessKey = System.getProperty("CLOUD_SQS_ACCESS_KEY"); 159 // シークレットキー 160 String secretKey = System.getProperty("CLOUD_SQS_SECRET_KEY"); 161 // 送信メッセージ(乱数は重複排除IDのテスト用) 162 String message = "サンプル送信メッセージ:" + Math.random(); 163 164 // SQSにメッセージ送信 165 QueueSend queueSend = new QueueSend_SQS(); 166 167 // キュー情報の設定 168 QueueInfo queueInfo = new QueueInfo(); 169 170 queueInfo.setSqsFifoGroupId(groupId); 171 172 // メッセージ 173 queueInfo.setMessage(message); 174 175 // 接続 176 queueSend.connect(url, accessKey, secretKey); 177 // 送信 178 queueSend.sendMessage(queueInfo); 179 // クローズ 180 queueSend.close(); 181 } 182}