001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.fukurou.process; 017 018import org.opengion.fukurou.util.Argument; 019import org.opengion.fukurou.util.SystemParameter; 020import org.opengion.fukurou.util.LogWriter; 021 022import org.opengion.fukurou.util.HybsEntry ; 023import org.opengion.fukurou.util.Closer; 024import org.opengion.fukurou.db.ConnectionFactory; 025 026import java.util.Set ; 027import java.util.HashSet ; 028import java.util.Map ; 029import java.util.LinkedHashMap ; 030 031import java.sql.Connection; 032import java.sql.Statement; 033import java.sql.ResultSet; 034import java.sql.SQLException; 035 036/** 037 * Process_BulkQueryは、データベースから読み取った内容を、一括処理するために、 038 * ParamProcess のサブクラス(Process_DBParam)にセットしたり、加工したりする 039 * FirstProcess と、ChainProcess のインターフェースを両方持った、実装クラスです。 040 * 041 * このクラスは、上流から、下流への処理は、1度しか実行されません。 042 * FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します。 043 * ChainProcess は、その結果を取り出し、自分自身の処理結果と合せて加工します。 044 * 045 * FirstProcess では、-action は、query のみです。 046 * query は、指定のSQL文を実行し、結果のSetをParamProcessに設定します。 047 * ChainProcess では、-action は、query、bulkSet、minus、intersect が指定できます。 048 * query は、上記と同じです。 049 * minus は、先のSetから、SQL文の実行結果を引き算し、結果Setを再設定します。 050 * intersect は、先のSetから、SQL文の実行結果と重複する結果Setを再設定します。 051 * bulkSet は、先のSetを取り出し、SQL文に加味して処理します。 052 * 流れ的には、query で検索し、minusまたはintersect でSetオブジェクトを加工し、bulkSet で 053 * 利用します。例えば、ORACLEから、ユニークキーのSetを作成し、SQLServerのユニークキーを 054 * minusした結果を、ORACLEからDELETEすれば、不要なデータを削除するなどの処理が実行可能になります。 055 * また、単純に、query だけを、チェインすれば、単発のUPDATE文を実行することが可能です。 056 * 057 * データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に 058 * 設定された接続(Connection)を使用します。 059 * DBID は、Process_DBParam の -configFile で指定する DBConfig.xml ファイルを使用します。 060 * 061 * 引数文字列中にスペースを含む場合は、ダブルコーテーション("") で括って下さい。 062 * 引数文字列の 『=』の前後には、スペースは挟めません。必ず、-key=value の様に 063 * 繋げてください。 064 * 065 * SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。 066 * 067 * @og.formSample 068 * Process_BulkQuery -action=query -dbid=DBGE -sql="select KEY from TABLE_X" 069 * 070 * -action=処理方法(必須) : 実行する処理方法を指定します 071 * -action=query 単なるSQL文を実行します。 072 * -action=bulkSet 実行したSQL文の結果を、Set<String> オブジェクトに設定します。 073 * -action=minus Set<String> オブジェクトと、ここでの実行結果の差分をとります。 074 * -action=intersect Set<String> オブジェクトと、ここでの実行結果の積分をとります。 075 * [ -dbid=DB接続ID ] : -dbid=DBGE (例: Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定) 076 * [ -sql=検索SQL文 ] : -sql="select * from GEA08" 077 * [ -sqlFile=検索SQLファイル ] : -sqlFile=select.sql 078 * -sql= を指定しない場合は、ファイルで必ず指定してください。 079 * [ -sql_XXXX=固定値 ] : -sql_SYSTEM_ID=GE 080 * SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。 081 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE' 082 * [ -bulkKey=XXXX ] : -bulkKey=XXXX 083 * SQL文中の{@XXXX}文字列をProcess_BulkQuery等で取得した値で置き換えます。 084 * WHERE SYSTEM_ID IN ( {@XXXX} ) ⇒ WHERE SYSTEM_ID IN ( 'AA','BB','CC' ) 085 * [ -bulkType=NUM|STR ] : -bulType=STR 086 * Bulkの値を文字列に変換する場合に、数字型か、文字型を指定します。 087 * 数字型では、AA,BB,CC とし、文字型では、'AA','BB','CC' に変換します(初期値:STR)。 088 * [ -fetchSize=100 ] :フェッチする行数(初期値:100) 089 * [ -display=[false/true] ] :結果を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない]) 090 * [ -debug=[false/true] ] :デバッグ情報を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない]) 091 * 092 * @og.rev 5.3.4.0 (2011/04/01) 新規追加 093 * @version 4.0 094 * @author Kazuhiko Hasegawa 095 * @since JDK5.0, 096 */ 097public class Process_BulkQuery extends AbstractProcess implements FirstProcess , ChainProcess { 098 private static final int MAX_BULK_SET = 500 ; // ORACLE の制約が 1000 なので。 099 100 private static final String ACT_QUERY = "query" ; 101 private static final String ACT_BULKSET = "bulkSet" ; 102 private static final String ACT_MINUS = "minus" ; 103 private static final String ACT_INTERSECT = "intersect" ; 104 105 private static final String[] ACTION_LST = new String[] { ACT_QUERY,ACT_BULKSET,ACT_MINUS,ACT_INTERSECT }; 106 107// private LineModel newData = null; 108 109 private String actionCmd = null; // SQL結果を加工(query:実行、minus:引き算、intersect:重複分) 110 private String dbid = null; // メインDB接続ID 111 112 private String bulkKey = null; 113 private boolean bulkType = true; // true:STR , false:NUM 114 115 private int sqlCount = 0; // SQL文の処理件数 116 private int setCount = 0; // 取り出したSetの件数 117 private int outCount = 0; // マージ後のSetの件数 118 119 private int fetchSize = 100; 120 private boolean display = false; // 表示しない 121 private boolean debug = false; // デバッグ情報 122 private boolean firstTime = true; // 最初の一回目 123 124 private static final Map<String,String> mustProparty ; // [プロパティ]必須チェック用 Map 125 private static final Map<String,String> usableProparty ; // [プロパティ]整合性チェック Map 126 127 static { 128 mustProparty = new LinkedHashMap<String,String>(); 129 mustProparty.put( "action", "実行する処理方法を指定します。(query|minus|intersect)" ); 130 131 usableProparty = new LinkedHashMap<String,String>(); 132 usableProparty.put( "dbid", "Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定" ); 133 usableProparty.put( "sql", "検索SQL文(sql or sqlFile 必須)例: \"select * from GEA08\"" ); 134 usableProparty.put( "sqlFile", "検索SQLファイル(sql or sqlFile 必須)例: select.sql" ); 135 usableProparty.put( "sql_", "SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。" + 136 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" ); 137 usableProparty.put( "dbid2", "DB接続ID2 例: Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定" ); 138 usableProparty.put( "sql2", "検索SQL文2(sql or sqlFile 必須)例: \"select * from GEA08\"" ); 139 usableProparty.put( "sqlFile2", "検索SQLファイル2(sql or sqlFile 必須)例: select.sql" ); 140 usableProparty.put( "sql2_", "SQL文2中の{@XXXX}文字列を指定の固定値で置き換えます。" + 141 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" ); 142 usableProparty.put( "bulkKey", "SQL文中の{@XXXX}文字列をProcess_BulkQuery等で取得した値で置き換えます。" + 143 CR + "WHERE SYSTEM_ID IN ( {@XXXX} ) ⇒ WHERE SYSTEM_ID IN ( 'AA','BB','CC' )" ); 144 usableProparty.put( "bulkType", "Bulkの値を文字列に変換する場合に、文字型か、数字型を指定します。" + 145 CR + "数字型では、AA,BB,CC とし、文字型では、'AA','BB','CC' に変換します。(初期値:STR)" ); 146 usableProparty.put( "fetchSize","フェッチする行数 (初期値:100)" ); 147 usableProparty.put( "display", "結果を標準出力に表示する(true)かしない(false)か" + 148 CR + "(初期値:false:表示しない)" ); 149 usableProparty.put( "debug", "デバッグ情報を標準出力に表示する(true)かしない(false)か" + 150 CR + "(初期値:false:表示しない)" ); 151 } 152 153 /** 154 * デフォルトコンストラクター。 155 * このクラスは、動的作成されます。デフォルトコンストラクターで、 156 * super クラスに対して、必要な初期化を行っておきます。 157 * 158 */ 159 public Process_BulkQuery() { 160 super( "org.opengion.fukurou.process.Process_BulkQuery",mustProparty,usableProparty ); 161 } 162 163 /** 164 * プロセスの初期化を行います。初めに一度だけ、呼び出されます。 165 * 初期処理(ファイルオープン、DBオープン等)に使用します。 166 * 167 * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 168 * 169 * @param paramProcess データベースの接続先情報などを持っているオブジェクト 170 */ 171 public void init( final ParamProcess paramProcess ) { 172 Argument arg = getArgument(); 173 174 actionCmd = arg.getProparty("action" , null , ACTION_LST ); 175 176 fetchSize = arg.getProparty("fetchSize",fetchSize); 177 display = arg.getProparty("display",display); 178 debug = arg.getProparty("debug",debug); 179// if( debug ) { println( arg.toString() ); } // 5.7.3.0 (2014/02/07) デバッグ情報 180 181 dbid = arg.getProparty("dbid"); 182 String sql = arg.getFileProparty("sql","sqlFile",true); 183 if( debug ) { println( "入力SQL:" + sql ); } 184 185 HybsEntry[] entry =arg.getEntrys( "sql_" ); //配列 186 SystemParameter sysParam = new SystemParameter( sql ); 187 sql = sysParam.replace( entry ); 188 if( debug ) { println( "変換SQL:" + sql ); } 189 190 if( ACT_BULKSET.equalsIgnoreCase( actionCmd ) ) { 191 bulkKey = arg.getProparty("bulkKey"); 192 String bkType = arg.getProparty("bulkType"); 193 if( bkType != null ) { bulkType = bkType.equalsIgnoreCase( "STR" ); } 194 195 Set<String> setData = paramProcess.getBulkData(); 196 if( debug ) { println( setData.toString() ); } 197 setCount = setData.size(); 198 199 if( setCount > 0 ) { 200 // 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 201// sql = makeBulkQuery( sql,bulkKey,bulkType,setData ); 202// if( debug ) { println( "BulkSQL:" + sql ); } 203// createSetData( paramProcess, dbid, sql ); 204 String[] sqls = makeBulkQuery( sql,bulkKey,bulkType,setData ); 205 for( int i=0; i<sqls.length; i++ ) { 206 if( debug ) { println( "BulkSQL:" + sqls[i] ); } 207 createSetData( paramProcess, dbid, sqls[i] ); 208 } 209 } 210 } 211 else if( ACT_QUERY.equalsIgnoreCase( actionCmd ) ) { 212 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 213 if( debug ) { println( setData2.toString() ); } 214 setCount = setData2.size(); 215 outCount = setCount; 216 paramProcess.setBulkData( setData2 ); 217 } 218 else { 219 Set<String> setData = paramProcess.getBulkData(); 220 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 221 setCount = setData2.size(); 222 223 if( ACT_MINUS.equalsIgnoreCase( actionCmd ) ) { 224 setData.removeAll( setData2 ); 225 } 226 else if( ACT_INTERSECT.equalsIgnoreCase( actionCmd ) ) { 227 setData.retainAll( setData2 ); 228 } 229 outCount = setData.size(); 230 if( debug ) { println( setData.toString() ); } 231 paramProcess.setBulkData( setData ); 232 } 233 } 234 235 /** 236 * プロセスの終了を行います。最後に一度だけ、呼び出されます。 237 * 終了処理(ファイルクローズ、DBクローズ等)に使用します。 238 * 239 * @param isOK トータルで、OKだったかどうか [true:成功/false:失敗] 240 */ 241 public void end( final boolean isOK ) { 242 // 何もありません。 243 } 244 245 /** 246 * このデータの処理において、次の処理が出来るかどうかを問い合わせます。 247 * この呼び出し1回毎に、次のデータを取得する準備を行います。 248 * 249 * @return 処理できる:true / 処理できない:false 250 */ 251 public boolean next() { 252 return firstTime; 253 } 254 255 /** 256 * 引数の LineModel を処理するメソッドです。 257 * 変換処理後の LineModel を返します。 258 * 後続処理を行わない場合(データのフィルタリングを行う場合)は、 259 * null データを返します。つまり、null データは、後続処理を行わない 260 * フラグの代わりにも使用しています。 261 * なお、変換処理後の LineModel と、オリジナルの LineModel が、 262 * 同一か、コピー(クローン)かは、各処理メソッド内で決めています。 263 * ドキュメントに明記されていない場合は、副作用が問題になる場合は、 264 * 各処理ごとに自分でコピー(クローン)して下さい。 265 * 266 * @param data オリジナルのLineModel 267 * 268 * @return 処理変換後のLineModel 269 */ 270 @SuppressWarnings(value={"unchecked"}) 271 public LineModel action( final LineModel data ) { 272 return data ; 273 } 274 275 /** 276 * 最初に、 行データである LineModel を作成します 277 * FirstProcess は、次々と処理をチェインしていく最初の行データを 278 * 作成して、後続の ChainProcess クラスに処理データを渡します。 279 * 280 * @param rowNo 処理中の行番号 281 * 282 * @return 処理変換後のLineModel 283 */ 284 public LineModel makeLineModel( final int rowNo ) { 285 firstTime = false; // 一度しか処理しないため、false を設定する。 286 287 LineModel model = new LineModel(); 288 289 model.setRowNo( rowNo ); 290 291 return model; 292 } 293 294 /** 295 * 内部で使用する Set オブジェクトを作成します。 296 * Exception 以外では、必ず Set<String> オブジェクトを返します。 297 * 298 * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 299 * 300 * @param paramProcess データベースの接続先情報などを持っているオブジェクト 301 * @param dbid 接続先ID 302 * @param sql 実行するSQL文(検索系) 303 * 304 * @return 実行結果から取り出した、最初のカラムのみを集めた Setオブジェクト 305 * @throws RuntimeException データベース処理ができなかった場合。 306 */ 307 private Set<String> createSetData( final ParamProcess paramProcess, final String dbid, final String sql ) { 308 Set<String> data = new HashSet<String>(); 309 310 Connection connection = null; 311 Statement stmt = null; 312 ResultSet resultSet = null; 313 314 try { 315 connection = paramProcess.getConnection( dbid ); 316 stmt = connection.createStatement(); 317 if( fetchSize > 0 ) { stmt.setFetchSize( fetchSize ); } 318 if( stmt.execute( sql ) ) { // true:検索系 , false:更新系 319 resultSet = stmt.getResultSet(); 320 while( resultSet.next() ) { 321 sqlCount++ ; 322 String str = resultSet.getString(1); 323 if( display ) { println( str ); } 324 data.add( str ); 325 } 326 } 327 else { 328// sqlCount = stmt.getUpdateCount(); // 5.3.9.0 (2011/09/01) 329 sqlCount += stmt.getUpdateCount(); 330 } 331 } 332 catch (SQLException ex) { 333 String errMsg = "SQL を実行できませんでした。" + CR 334 + "errMsg=[" + ex.getMessage() + "]" + CR 335 + "errorCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR 336 + "DBID=" + dbid + CR 337 + "SQL =" + sql ; 338 339 throw new RuntimeException( errMsg,ex ); 340 } 341 finally { 342 Closer.resultClose( resultSet ); 343 Closer.stmtClose( stmt ); 344 345 ConnectionFactory.remove( connection,dbid ); 346 } 347 return data; 348 } 349 350 /** 351 * 内部で使用する Set オブジェクトを作成します。 352 * Exception 以外では、必ず Set<String[]> オブジェクトを返します。 353 * 354 * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 355 * 356 * @param sql オリジナルのSQL文 357 * @param bulkKey 一括処理で置き換えるキー文字列 358 * @param bulkType 文字型(true)か、数字型(false)を指定 359 * @param setData 一括処理の元となるSetオブジェクト 360 * 361 * @return オリジナルのSQL文 に 一括処理の文字列と置換したSQL文の配列 362 */ 363 private String[] makeBulkQuery( final String sql, final String bulkKey, final boolean bulkType,final Set<String> setData ) { 364 String[] sqls = new String[ (setData.size()/MAX_BULK_SET) + 1 ]; 365 int idx = 0; 366 int cnt = 0; 367 368 StringBuilder buf = new StringBuilder(); 369 String bulkVal = null; 370 if( bulkType ) { // 文字列の場合 371 for( String key : setData ) { 372 cnt++; 373 buf.append( ",'" ).append( key ).append( "'" ); 374 if( cnt >= MAX_BULK_SET ) { 375 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 376 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 377 cnt = 0; 378 buf = new StringBuilder(); 379 } 380 } 381 if( cnt > 0 ) { // きっちりで終わらない場合 382 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 383 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 384 } 385 } 386 else { // 数字の場合 387 for( String key : setData ) { 388 cnt++; 389 buf.append( "," ).append( key ); 390 if( cnt >= MAX_BULK_SET ) { 391 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 392 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 393 cnt = 0; 394 buf = new StringBuilder(); 395 } 396 } 397 if( cnt > 0 ) { // きっちりで終わらない場合 398 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 399 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 400 } 401 } 402// String bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 403 404// return sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 405 return sqls; 406 } 407 408 /** 409 * プロセスの処理結果のレポート表現を返します。 410 * 処理プログラム名、入力件数、出力件数などの情報です。 411 * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような 412 * 形式で出してください。 413 * 414 * @return 処理結果のレポート 415 */ 416 public String report() { 417 String report = "[" + getClass().getName() + "]" + CR 418 + TAB + "Action : " + actionCmd + CR 419 + TAB + "DBID : " + dbid + CR 420 + TAB + "sqlCount : " + sqlCount + CR 421 + TAB + "setCount : " + setCount + CR 422 + TAB + "outCount : " + outCount ; 423 424 return report ; 425 } 426 427 /** 428 * このクラスの使用方法を返します。 429 * 430 * @return このクラスの使用方法 431 */ 432 public String usage() { 433 StringBuilder buf = new StringBuilder(); 434 435 buf.append( "Process_BulkQueryは、データベースから読み取った内容を、一括処理するために、" ).append( CR ); 436 buf.append( "ParamProcess のサブクラス(Process_DBParam)にセットしたり、加工したりする" ).append( CR ); 437 buf.append( "FirstProcess と、ChainProcess のインターフェースを両方持った、実装クラスです。" ).append( CR ); 438 buf.append( CR ); 439 buf.append( "このクラスは、上流から、下流への処理は、1度しか実行されません。" ).append( CR ); 440 buf.append( "FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します。" ).append( CR ); 441 buf.append( "ChainProcess は、その結果を取り出し、自分自身の処理結果と合せて加工します。" ).append( CR ); 442 buf.append( CR ); 443 buf.append( "FirstProcess では、-action は、query のみです。" ).append( CR ); 444 buf.append( " query は、指定のSQL文を実行し、結果のSetをParamProcessに設定します。" ).append( CR ); 445 buf.append( "ChainProcess では、-action は、query、bulkSet、minus、intersect が指定できます。" ).append( CR ); 446 buf.append( " query は、上記と同じです。" ).append( CR ); 447 buf.append( " minus は、先のSetから、SQL文の実行結果を引き算し、結果Setを再設定します。" ).append( CR ); 448 buf.append( " intersect は、先のSetから、SQL文の実行結果と重複する結果Setを再設定します。" ).append( CR ); 449 buf.append( " bulkSet は、先のSetを取り出し、SQL文に加味して処理します。" ).append( CR ); 450 buf.append( CR ); 451 buf.append( "流れ的には、query で検索し、minusまたはintersect でSetオブジェクトを加工し、" ).append( CR ); 452 buf.append( "bulkSet で利用します。例えば、ORACLEから、ユニークキーのSetを作成し、" ).append( CR ); 453 buf.append( "SQLServerのユニークキーをminusした結果を、ORACLEからDELETEすれば、不要な" ).append( CR ); 454 buf.append( "データを削除するなどの処理が実行可能になります。また、単純に、query だけを、" ).append( CR ); 455 buf.append( "チェインすれば、単発のUPDATE文を実行することが可能です。" ).append( CR ); 456 buf.append( CR ); 457 buf.append( "データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR ); 458 buf.append( "設定された接続(Connection)を使用します。" ).append( CR ); 459 buf.append( CR ); 460 buf.append( "引数文字列中に空白を含む場合は、ダブルコーテーション(\"\") で括って下さい。" ).append( CR ); 461 buf.append( "引数文字列の 『=』の前後には、空白は挟めません。必ず、-key=value の様に" ).append( CR ); 462 buf.append( "繋げてください。" ).append( CR ); 463 buf.append( CR ); 464 buf.append( "SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。" ).append( CR ); 465 buf.append( CR ).append( CR ); 466 467 buf.append( getArgument().usage() ).append( CR ); 468 469 return buf.toString(); 470 } 471 472 /** 473 * このクラスは、main メソッドから実行できません。 474 * 475 * @param args コマンド引数配列 476 */ 477 public static void main( final String[] args ) { 478 LogWriter.log( new Process_BulkQuery().usage() ); 479 } 480}