001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.fukurou.process; 017 018import java.util.HashMap; 019import java.util.Map; 020 021import org.opengion.fukurou.queue.QueueInfo; 022import org.opengion.fukurou.queue.QueueSend; 023import org.opengion.fukurou.queue.QueueSendFactory; 024import org.opengion.fukurou.util.Argument; 025import org.opengion.fukurou.util.LogWriter; 026 027/** 028 *Process_QueueSendは、MQ or SQSにメッセージキューを送信する、 029 *FirstProcessインタフェースの実装クラスです。 030 * 031 *@og.formSample 032 * 1)MQにメッセージを送信する場合 033 * java -DmqUserId=[mqユーザid] -DmqPassword=[mqパスワード] -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_QueueSend -queueType=MQ -jmsServer=[mqサーバー] -groupId=[グループID] -message=[メッセージ] 034 * 2)SQSにメッセージを送信する場合 035 * java -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_QueueSend -accessKey=[awsアクセスキー] -secretKey=[awsシークレットキー] -queueType=SQS -jmsServer=[sqsサーバー] -groupId=[グループID] -message=[メッセージ] 036 * 037 *※proxy環境から、外部のMQやSQSサーバにはプロキシ情報を渡して、実行する必要があります。 038 *-Dhttp.proxyHost=[proxyホスト] -Dhttp.proxyPort=[proxyポート] -Dhttps.proxyHost=[proxyホスト] -Dhttps.proxyPort=[proxyポート] 039 * 040 * -queueType=キュータイプ :MQ or SQS 041 * -jmsServer=キューサーバー :キューサーバーのURLを指定 042 * -groupId=グループID :キュー格納先のグループID 043 * -message=送信メッセージ :キューに格納するメッセージ 044 * [-sccessKey=アクセスキー] :SQSに接続用のアクセスキーです(aws上で取得) 045 * [-secretKey=シークレットキー] :SQSに接続用のシークレットキーです(aws上で取得) 046 * 047 * コマンド例 048 * java -Dhttp.proxyHost=proxyhost -Dhttp.proxyPort=8080 -Dhttps.proxyHost=proxyhost -Dhttps.proxyPort=8080 -cp H:\sample\* ^ 049 * org.opengion.fukurou.process.MainProcess ^ 050 * org.opengion.fukurou.process.Process_Logger -logFile=System.out ^ 051 * org.opengion.fukurou.process.Process_QueueSend -accessKey=[アクセスキー] -secretKey=[シークレットキー] -queueType=SQS ^ 052 * -jmsServer=https://sqs.ap-northeast-1.amazonaws.com/000000000000/otfifo01.fifo -groupId=sample -message=sendMsg 053 * 054 * @og.rev 5.10.17.1 (2019/11/15) 新規追加 055 * 056 * @verion 5 057 * @since JDK7 058 */ 059public class Process_QueueSend extends AbstractProcess implements FirstProcess{ 060 private static final String name = ""; 061 private static final Map<String, String> mustProperty; 062 private static final Map<String, String> usableProperty; 063 064 QueueSend queueSend; 065 066 private String queueType; 067 private String jmsServer; 068 private String groupId; 069 private String message; 070 071 static { 072 mustProperty = new HashMap<String,String>(); 073 mustProperty.put("queueType", "キュータイプ"); 074 mustProperty.put("jmsServer", "jms接続先"); 075 mustProperty.put("groupId", "グループID"); 076 mustProperty.put("message", "メッセージ"); 077 078 usableProperty = new HashMap<String,String>(); 079 // SQS用 080 usableProperty.put("accessKey", "アクセスキ"); 081 usableProperty.put("secretKey", "シークレットキー"); 082 } 083 084 /** 085 * コンストラクター 086 */ 087 public Process_QueueSend() { 088 super(name, mustProperty, usableProperty); 089 } 090 091 /** 092 * プロセスの初期化を行います。初めに一度だけ、呼び出されます。 093 * 初期処理(ファイルオープン、DBオープン等)に使用します。 094 * 095 * @param paramProcess データベースの接続先情報などを持っているオブジェクト 096 */ 097 @Override 098 public void init(ParamProcess paramProcess) { 099 Argument arg = getArgument(); 100 101 queueType = arg.getProparty("queueType"); 102 jmsServer = arg.getProparty("jmsServer"); 103 groupId = arg.getProparty("groupId"); 104 message = arg.getProparty("message"); 105 final String accessKey = arg.getProparty("accessKey"); 106 final String secretKey = arg.getProparty("secretKey"); 107 108 queueSend = QueueSendFactory.newQueueSend(queueType); 109 110 // バッチ実行 111 queueSend.setBatchFlg(true); 112 113 // 接続処理 114 queueSend.connect(jmsServer, accessKey, secretKey); 115 } 116 117 /** 118 * プロセスの終了を行います。最後に一度だけ、呼び出されます。 119 * 終了処理(ファイルクローズ、DBクローズ等)に使用します。 120 * 121 * @param isOK トータルで、OKだったかどうか[true:成功/false:失敗] 122 */ 123 @Override 124 public void end(boolean isOK) { 125 queueType = ""; 126 jmsServer = ""; 127 groupId = ""; 128 message = ""; 129 130 if(queueSend != null) { 131 queueSend.close(); 132 } 133 queueSend = null; 134 } 135 136 /** 137 * このデータの処理において、次の処理が出来るかどうかを問い合わせます。 138 * この呼び出し1回毎に、次のデータを取得する準備を行います。 139 * 140 * @return 処理できる:true / 処理できない:false 141 **/ 142 @Override 143 public boolean next() { 144 QueueInfo queueInfo = new QueueInfo(); 145 queueInfo.setJmsServer(jmsServer); 146 queueInfo.setMqQueueName(groupId); 147 queueInfo.setSqsFifoGroupId(groupId); 148 queueInfo.setMessage(message); 149 150 queueSend.sendMessage(queueInfo); 151 152 return false; 153 } 154 155 /** 156 * 最初に、 行データである LineModel を作成します 157 * FirstProcess は、次々と処理をチェインしていく最初の行データを 158 * 作成して、後続の ChainProcess クラスに処理データを渡します。 159 * 160 * @param rowNo 処理中の行番号 161 * 162 * @return 処理変換後のLineModel 163 * */ 164 @Override 165 public LineModel makeLineModel(int rowNo) { 166 // 後続のChainProcessは実行しません。 167 return null; 168 } 169 170 /** 171 * プロセスの処理結果のレポート表現を返します。 172 * 処理プログラム名、入力件数、出力件数などの情報です。 173 * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような 174 * 形式で出してください。 175 * 176 * @return 処理結果のレポート 177 */ 178 @Override 179 public String report() { 180 final String report = "[" + getClass().getName() + "]" + CR 181 + TAB + "queueType:" + queueType + CR 182 + TAB + "jmsServer:" + jmsServer + CR 183 + TAB + "gropuId:" + groupId + CR 184 + TAB + "message:" + message; 185 return report; 186 } 187 188 /** 189 * このクラスの使用方法を返します。 190 * 191 * @return このクラスの使用方法 192 */ 193 @Override 194 public String usage() { 195 StringBuilder buf = new StringBuilder(); 196 197 buf.append("Process_QueueSendは、MQ or SQSにメッセージキューを送信する、").append( CR ); 198 buf.append("FirstProcessインタフェースの実装クラスです。").append(CR); 199 buf.append(CR); 200 buf.append("-queueType=キュータイプ :MQ or SQS").append(CR); 201 buf.append("-jmsServer=キューサーバー :キューサーバーのURLを指定").append(CR); 202 buf.append("-groupId=グループID :キュー格納先のグループID").append(CR); 203 buf.append("-message=送信メッセージ :キューに格納するメッセージ").append(CR); 204 buf.append("[-sccessKey=アクセスキー]").append(CR); 205 buf.append("[-secretKey=シークレットキー]").append(CR); 206 buf.append( CR ).append( CR ); 207 buf.append( getArgument().usage() ).append( CR ); 208 209 return buf.toString(); 210 } 211 212 /** 213 * このクラスは、main メソッドから実行できません。 214 * 215 * @param args コマンド引数配列 216 */ 217 public static void main(final String[] args) { 218 LogWriter.log(new Process_QueueSend().usage()); 219 } 220 221}