001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016package org.opengion.fukurou.process;
017
018import java.io.File;
019import java.util.HashMap;
020import java.util.Map;
021
022import org.opengion.fukurou.business.BizUtil;
023import org.opengion.fukurou.queue.QueueInfo;
024import org.opengion.fukurou.queue.QueueReceive;
025import org.opengion.fukurou.queue.QueueReceiveFactory;
026import org.opengion.fukurou.util.Argument;
027import org.opengion.fukurou.util.LogWriter;
028
029/**
030 *Process_QueueReceiveは、MQ or SQSにメッセージキューを受信する、
031 *FirstProcessインタフェースの実装クラスです。
032 *
033 *受信したメッセージを指定したbizlogicファイルで処理を行います。
034 *bizlogicファイルでデータベース処理を行うため、
035 *Process_DBParamを前処理として実行します。
036 *
037 *@og.formSample
038 * 1)MQからメッセージを受信する場合
039 *   java -DmqUserId=[mqユーザid] -DmqPassword=[mqパスワード] -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_DBParam -infoUSER=[ユーザーID] -infoPGID=process -configFile=[DBConfigパス] org.opengion.fukurou.process.Process_QueueReceive -accessKey=[アクセスキー] -secretKey=[シークレットキー] -webinfDir=[WEB-INFパス] -bizsrcDir=[bizディレクトリパス] -queueType=MQ -jmsServer=[mqサーバのurl] -groupId=[グループID] -systemId=[システムID] -logicName=[bizlogicファイル名]
040 * 2)SQSからメッセージを受信する場合
041 *   java -cp [クラスパス] org.opengion.fukurou.process.MainProcess org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_DBParam -infoUSER=[ユーザーID] -infoPGID=process -configFile=[DBConfigパス] org.opengion.fukurou.process.Process_QueueReceive -accessKey=[アクセスキー] -secretKey=[シークレットキー] -webinfDir=[WEB-INFパス] -bizsrcDir=[bizディレクトリパス] -queueType=SQS -jmsServer=[sqsサーバのurl] -groupId=[グループID] -systemId=[システムID] -logicName=[bizlogicファイル名]
042 *
043 *※proxy環境から、外部のMQやSQSサーバにはプロキシ情報を渡して、実行する必要があります。
044 *-Dhttp.proxyHost=[proxyホスト] -Dhttp.proxyPort=[proxyポート] -Dhttps.proxyHost=[proxyホスト] -Dhttps.proxyPort=[proxyポート]
045 *
046 * -queueType=キュータイプ       :MQ or SQS
047 * -jmsServer=キューサーバー     :キューサーバーのURLを指定
048 * -groupId=グループID           :キュー格納先のグループID
049 * -webinfDir=WEB-INFパス        :WEB-INFのディレクトリパス(bizlogic用)
050 * -bizsrcDir=bizファイルパス    :bizファイルディレクトリパス(bizlogic用)
051 * -systemId=システムID          :システムID(bizlogic用)
052 * -logicName=ロジックファイル名 :bizLogicのファイル名(bizlogic用)
053 * [-sccessKey=アクセスキー]     :SQSに接続用のアクセスキーです(aws上で取得)
054 * [-secretKey=シークレットキー] :SQSに接続用のシークレットキーです(aws上で取得)
055 * 
056 * コマンド例
057 * java -DmqUserId=admin -DmqPassword=admin -Dhttps.proxyHost=xxx-px^
058 *  -Dhttps.proxyPort=8081 -cp H:\sample\* ^
059 *  org.opengion.fukurou.process.MainProcess ^
060 *  org.opengion.fukurou.process.Process_Logger -logFile=System.out org.opengion.fukurou.process.Process_DBParam ^
061 *  -infoUSER=username -infoPGID=process -configFile=H:\sample\DBConfig.xml ^
062 *  org.opengion.fukurou.process.Process_QueueReceive ^
063 *  -webinfDir=H:\sample\gf\WEB-INF -bizsrcDir=H:\sample\gf\src\biz -queueType=MQ ^
064 *  -jmsServer=tcp://localhost:61616 -groupId=sample002 -systemId=GF -logicName=gf.TEST03g.opengion.fukurou.process.Process_QueueReceive ^
065 *  -webinfDir=H:\sample\gf\WEB-INF -bizsrcDir=H:\sample\gf\src\biz -queueType=MQ -jmsServer=tcp://localhost:61616 -groupId=sample002 ^
066 *  -systemId=GF -logicName=gf.TEST03
067 *
068 * @og.rev 5.10.17.1 (2019/11/15) 新規追加
069 *
070 * @verion 5
071 * @since JDK7
072 */
073public class Process_QueueReceive extends AbstractProcess implements FirstProcess{
074        private static String name;
075        private static Map<String, String> mustProperty;
076        private static Map<String, String> usableProperty;
077
078        QueueReceive queueReceive;
079
080        private String queueType;
081        private String jmsServer;
082        private String groupId;
083        private String systemId;
084        private String logicName;
085        private String receiveMessage;
086
087        static {
088                mustProperty = new HashMap<String, String>();
089                mustProperty.put("queueType", "キュータイプ");
090                mustProperty.put("jmsServer", "jms接続先");
091                mustProperty.put("groupId", "グループID");
092                mustProperty.put("webinfDir", "WEB-INFディレクトリ");
093                mustProperty.put("bizsrcDir", "bizのソースファイルディレクトリ");
094                mustProperty.put("systemId", "システムID");
095                mustProperty.put("logicName", "ロジック名");
096
097                usableProperty = new HashMap<String, String>();
098                // SQS用
099                usableProperty.put("accessKey", "アクセスキ");
100                usableProperty.put("secretKey",  "シークレットキー");
101        }
102
103        /**
104         * コンストラクター
105         */
106        public Process_QueueReceive() {
107                super(name, mustProperty, usableProperty);
108        }
109
110        /**
111         * このクラスの使用方法を返します。
112         *
113         * @return      このクラスの使用方法
114         */
115        @Override
116        public String usage() {
117                StringBuilder buf = new StringBuilder();
118
119                buf.append("Process_QueueReceiveは、MQ or SQSにメッセージキューを受信する、").append(CR);
120                buf.append("FirstProcessインタフェースの実装クラスです。").append(CR);
121                buf.append(CR);
122                buf.append("-queueType=キュータイプ       :MQ or SQS").append(CR);
123                buf.append("-jmsServer=キューサーバー     :キューサーバーのURLを指定").append(CR);
124                buf.append("-groupId=グループID           :キュー格納先のグループID").append(CR);
125                buf.append("-webinfDir=WEB-INFパス        :WEB-INFのディレクトリパス(bizlogic用)").append(CR);
126                buf.append("-bizsrcDir=bizファイルパス    :bizファイルディレクトリパス(bizlogic用)").append(CR);
127                buf.append("-systemId=システムID          :システムID(bizlogic用)").append(CR);
128                buf.append("-logicName=ロジックファイル名 :bizLogicのファイル名(bizlogic用)").append(CR);
129                buf.append("[-sccessKey=アクセスキー]     :SQSに接続用のアクセスキーです(aws上で取得)").append(CR);
130                buf.append("[-secretKey=シークレットキー] :SQSに接続用のシークレットキーです(aws上で取得)").append(CR);
131                buf.append( CR ).append( CR );
132                buf.append( getArgument().usage() ).append( CR );
133
134                return null;
135        }
136
137        /**
138         * プロセスの初期化を行います。初めに一度だけ、呼び出されます。
139         * 初期処理(ファイルオープン、DBオープン等)に使用します。
140         *
141         * @param   paramProcess データベースの接続先情報などを持っているオブジェクト
142         */
143        @Override
144        public void init(ParamProcess paramProcess) {
145                Argument arg = getArgument();
146
147                queueType = arg.getProparty("queueType");
148                jmsServer = arg.getProparty("jmsServer");
149                groupId = arg.getProparty("groupId");
150                systemId = arg.getProparty("systemId");
151                logicName = arg.getProparty("logicName");
152                final String accessKey = arg.getProparty("accessKey");
153                final String secretKey = arg.getProparty("secretKey");
154
155                queueReceive = QueueReceiveFactory.newQueueReceive(queueType);
156
157                queueReceive.setBatchFlg(true);
158
159                queueReceive.connect(jmsServer, accessKey, secretKey);
160        }
161
162        /**
163         * プロセスの終了を行います。最後に一度だけ、呼び出されます。
164         * 終了処理(ファイルクローズ、DBクローズ等)に使用します。
165         *
166         * @param   isOK トータルで、OKだったかどうか[true:成功/false:失敗]
167         */
168        @Override
169        public void end(boolean isOK) {
170                queueType = "";
171                jmsServer = "";
172                groupId = "";
173                systemId = "";
174                logicName = "";
175                receiveMessage = "";
176
177                if(queueReceive != null) {
178                        queueReceive.close();
179                }
180        }
181
182        /**
183         * このデータの処理において、次の処理が出来るかどうかを問い合わせます。
184         * この呼び出し1回毎に、次のデータを取得する準備を行います。
185         *
186         * @return 処理できる:true / 処理できない:false
187         **/
188        @Override
189        public boolean next() {
190
191                QueueInfo queueInfo = queueReceive.receive(groupId);
192
193                if(queueInfo != null) {
194                        receiveMessage = queueInfo.getMessage();
195
196                        try {
197                                // bizlogic処理
198                                callActBizLogic(systemId, logicName, receiveMessage);
199                        } catch (Throwable te) {
200                                String errMsg = "bizlogicの実行に失敗しました。" + te.getMessage();
201                                throw new RuntimeException(errMsg);
202                        }
203                }else {
204                        String errMsg = "メッセージキューが登録されていません。";
205                        throw new RuntimeException(errMsg);
206                }
207                return false;
208        }
209
210        /**
211         * bizLogic処理の呼び出し
212         * 必要なパス情報をリソースから取得して、
213         * BizUtil.actBizLogicにパス情報を渡すことで、
214         * bizLogicの処理を行います。
215         *
216         * @param systemId  システムID
217         * @param logicName ロジックファイル名
218         * @param msgText   メッセージ
219         * @throws Throwable エラー情報
220         */
221        private void callActBizLogic(String systemId, String logicName, String msgText) throws Throwable {
222                Argument arg = getArgument();
223                final String webinfDir = arg.getProparty("webinfDir");
224                final String srcDir = arg.getProparty("bizsrcDir");
225
226                // 対象 クラスパスの生成
227                // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
228                // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
229                // bizLogicTag.javaのコードを移植
230                StringBuilder sb = new StringBuilder();
231                sb.append('.').append(File.pathSeparatorChar);
232
233                File lib = new File(webinfDir);
234                File[] libFiles = lib.listFiles();
235                for (int i = 0; i < libFiles.length; i++) {
236                        sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar);
237                }
238                final String classDir = webinfDir + File.pathSeparator + "classes";
239                sb.append(classDir).append(File.pathSeparatorChar);
240                String classPath = sb.toString();
241
242                boolean isAutoCompile = true;
243                boolean isHotDeploy = true;
244
245                // bizLogicに渡すパラメータ
246                String[] keys = new String[] { "message" };
247                String[] vals = new String[] { msgText };
248
249                // bizLogic処理の実行
250                BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals);
251        }
252
253        /**
254         * 最初に、 行データである LineModel を作成します
255         * FirstProcess は、次々と処理をチェインしていく最初の行データを
256         * 作成して、後続の ChainProcess クラスに処理データを渡します。
257         *
258         * @param rowNo 処理中の行番号
259         *
260         * @return 処理変換後のLineModel
261         * */
262        @Override
263        public LineModel makeLineModel(int rowNo) {
264                // 後続のChainProcessは実行しません。
265                return null;
266        }
267
268        /**
269         * プロセスの処理結果のレポート表現を返します。
270         * 処理プログラム名、入力件数、出力件数などの情報です。
271         * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような
272         * 形式で出してください。
273         *
274         * @return   処理結果のレポート
275         */
276        @Override
277        public String report() {
278                final String report = "[" + getClass().getName() + "]" + CR
279                                + TAB + "queueType:" + queueType + CR
280                                + TAB + "jmsServer:" + jmsServer + CR
281                                + TAB + "gropuId:" + groupId + CR
282                                + TAB + "systemId:" + systemId + CR
283                                + TAB + "logicName" + logicName + CR
284                                + TAB + "receiveMessage:" + receiveMessage + CR
285                                ;
286                return report;
287        }
288
289        /**
290         * このクラスは、main メソッドから実行できません。
291         *
292         * @param args コマンド引数配列
293         */
294        public static void main(final String[] args) {
295                LogWriter.log(new Process_QueueReceive().usage());
296        }
297}