001 /* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 package org.opengion.fukurou.process; 017 018 import org.opengion.fukurou.util.Argument; 019 import org.opengion.fukurou.util.SystemParameter; 020 import org.opengion.fukurou.util.LogWriter; 021 022 import org.opengion.fukurou.util.HybsEntry ; 023 import org.opengion.fukurou.util.Closer; 024 import org.opengion.fukurou.db.ConnectionFactory; 025 026 import java.util.Set ; 027 import java.util.HashSet ; 028 import java.util.Map ; 029 import java.util.LinkedHashMap ; 030 031 import java.sql.Connection; 032 import java.sql.Statement; 033 import java.sql.ResultSet; 034 import java.sql.SQLException; 035 036 /** 037 * Process_BulkQueryは、データベ?スから読み取った?容を??処?るために? 038 * ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす? 039 * FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです? 040 * 041 * こ?クラスは、上流から?下流への処???度しか実行されません? 042 * FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します? 043 * ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します? 044 * 045 * FirstProcess では?action は、query のみです? 046 * query は、指定?SQL?実行し、結果のSetをParamProcessに設定します? 047 * ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます? 048 * query は、上記と同じです? 049 * minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します? 050 * intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します? 051 * bulkSet は、?のSetを取り?し?SQL??して処?ます? 052 * 流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?bulkSet で 053 * 利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?SQLServerのユニ?クキー? 054 * minusした結果を?ORACLEからDELETEすれば、不要な??タを削除するなどの処?実行可能になります? 055 * また?単純に、query ?を?チェインすれば、単発のUPDATE?実行することが可能です? 056 * 057 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に 058 * 設定された接?Connection)を使用します? 059 * DBID は、Process_DBParam の -configFile で?す?DBConfig.xml ファイルを使用します? 060 * 061 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ?? 062 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に 063 * 繋げてください? 064 * 065 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます? 066 * 067 * @og.formSample 068 * Process_BulkQuery -action=query -dbid=DBGE -sql="select KEY from TABLE_X" 069 * 070 * -action=処????) ??実行する??法を?しま? 071 * -action=query 単なるSQL?実行します? 072 * -action=bulkSet 実行したSQL??結果を?Set<String> オブジェクトに設定します? 073 * -action=minus Set<String> オブジェクトと、ここでの実行結果の差?とります? 074 * -action=intersect Set<String> オブジェクトと、ここでの実行結果の積?をとります? 075 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? 076 * [ -sql=検索SQL? ] ??-sql="select * from GEA08" 077 * [ -sqlFile=検索SQLファイル ] ??-sqlFile=select.sql 078 * -sql= を指定しな??合?、ファイルで??してください? 079 * [ -sql_XXXX=固定? ] ??-sql_SYSTEM_ID=GE 080 * SQL?の{@XXXX}??を指定?固定?で置き換えます? 081 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE' 082 * [ -bulkKey=XXXX ] ??-bulkKey=XXXX 083 * SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます? 084 * WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' ) 085 * [ -bulkType=NUM|STR ] ??-bulType=STR 086 * Bulkの値を文字?に変換する場合に、数字型か??型を指定します? 087 * 数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換しま?初期値:STR)? 088 * [ -fetchSize=100 ] ?フェ?する行数(初期値:100) 089 * [ -display=false|true ] ?結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない]) 090 * [ -debug=false|true ] ?デバッグ??を標準?力に表示する(true)かしな?false)?初期値:false[表示しない]) 091 * 092 * @og.rev 5.3.4.0 (2011/04/01) 新規追? 093 * @version 4.0 094 * @author Kazuhiko Hasegawa 095 * @since JDK5.0, 096 */ 097 public class Process_BulkQuery extends AbstractProcess implements FirstProcess , ChainProcess { 098 private static final int MAX_BULK_SET = 500 ; // ORACLE の制? 1000 なので? 099 100 private static final String ACT_QUERY = "query" ; 101 private static final String ACT_BULKSET = "bulkSet" ; 102 private static final String ACT_MINUS = "minus" ; 103 private static final String ACT_INTERSECT = "intersect" ; 104 105 private static final String[] ACTION_LST = new String[] { ACT_QUERY,ACT_BULKSET,ACT_MINUS,ACT_INTERSECT }; 106 107 // private LineModel newData = null; 108 109 private String actionCmd = null; // SQL結果を加工(query:実行?minus:引き算?intersect:重??) 110 private String dbid = null; // メインDB接続ID 111 112 private String bulkKey = null; 113 private boolean bulkType = true; // true:STR , false:NUM 114 115 private int sqlCount = 0; // SQL??処?数 116 private int setCount = 0; // 取り出したSetの件数 117 private int outCount = 0; // マ?ジ後?Setの件数 118 119 private int fetchSize = 100; 120 private boolean display = false; // 表示しな? 121 private boolean debug = false; // ???? 122 private boolean firstTime = true; // ??の?目 123 124 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map 125 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map 126 127 static { 128 mustProparty = new LinkedHashMap<String,String>(); 129 mustProparty.put( "action", "実行する??法を?します?(query|minus|intersect)" ); 130 131 usableProparty = new LinkedHashMap<String,String>(); 132 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? ); 133 usableProparty.put( "sql", "検索SQL?sql or sqlFile ??)? \"select * from GEA08\"" ); 134 usableProparty.put( "sqlFile", "検索SQLファイル(sql or sqlFile ??)? select.sql" ); 135 usableProparty.put( "sql_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" + 136 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" ); 137 usableProparty.put( "dbid2", "DB接続ID2 ? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? ); 138 usableProparty.put( "sql2", "検索SQL?(sql or sqlFile ??)? \"select * from GEA08\"" ); 139 usableProparty.put( "sqlFile2", "検索SQLファイル2(sql or sqlFile ??)? select.sql" ); 140 usableProparty.put( "sql2_", "SQL?中の{@XXXX}??を指定?固定?で置き換えます?" + 141 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" ); 142 usableProparty.put( "bulkKey", "SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます?" + 143 CR + "WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' )" ); 144 usableProparty.put( "bulkType", "Bulkの値を文字?に変換する場合に、文字型か?数字型を指定します?" + 145 CR + "数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換します?(初期値:STR)" ); 146 usableProparty.put( "fetchSize","フェ?する行数 (初期値:100)" ); 147 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? + 148 CR + "(初期値:false:表示しな?" ); 149 usableProparty.put( "debug", "????を標準?力に表示する(true)かしな?false)? + 150 CR + "(初期値:false:表示しな?" ); 151 } 152 153 /** 154 * ?ォルトコンストラクター? 155 * こ?クラスは、動??されます??ォルトコンストラクターで? 156 * super クラスに対して、?な初期化を行っておきます? 157 * 158 */ 159 public Process_BulkQuery() { 160 super( "org.opengion.fukurou.process.Process_BulkQuery",mustProparty,usableProparty ); 161 } 162 163 /** 164 * プロセスの初期化を行います?初めに??、呼び出されます? 165 * 初期処?ファイルオープン??オープン?に使用します? 166 * 167 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 168 * 169 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク? 170 */ 171 public void init( final ParamProcess paramProcess ) { 172 Argument arg = getArgument(); 173 174 actionCmd = arg.getProparty("action" , null , ACTION_LST ); 175 176 fetchSize = arg.getProparty("fetchSize",fetchSize); 177 display = arg.getProparty("display",display); 178 debug = arg.getProparty("debug",debug); 179 180 dbid = arg.getProparty("dbid"); 181 String sql = arg.getFileProparty("sql","sqlFile",true); 182 if( debug ) { println( "入力SQL:" + sql ); } 183 184 HybsEntry[] entry =arg.getEntrys( "sql_" ); //配? 185 SystemParameter sysParam = new SystemParameter( sql ); 186 sql = sysParam.replace( entry ); 187 if( debug ) { println( "変換SQL:" + sql ); } 188 189 if( ACT_BULKSET.equalsIgnoreCase( actionCmd ) ) { 190 bulkKey = arg.getProparty("bulkKey"); 191 String bkType = arg.getProparty("bulkType"); 192 if( bkType != null ) { bulkType = bkType.equalsIgnoreCase( "STR" ); } 193 194 Set<String> setData = paramProcess.getBulkData(); 195 if( debug ) { println( setData.toString() ); } 196 setCount = setData.size(); 197 198 if( setCount > 0 ) { 199 // 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 200 // sql = makeBulkQuery( sql,bulkKey,bulkType,setData ); 201 // if( debug ) { println( "BulkSQL:" + sql ); } 202 // createSetData( paramProcess, dbid, sql ); 203 String[] sqls = makeBulkQuery( sql,bulkKey,bulkType,setData ); 204 for( int i=0; i<sqls.length; i++ ) { 205 if( debug ) { println( "BulkSQL:" + sqls[i] ); } 206 createSetData( paramProcess, dbid, sqls[i] ); 207 } 208 } 209 } 210 else if( ACT_QUERY.equalsIgnoreCase( actionCmd ) ) { 211 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 212 if( debug ) { println( setData2.toString() ); } 213 setCount = setData2.size(); 214 outCount = setCount; 215 paramProcess.setBulkData( setData2 ); 216 } 217 else { 218 Set<String> setData = paramProcess.getBulkData(); 219 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 220 setCount = setData2.size(); 221 222 if( ACT_MINUS.equalsIgnoreCase( actionCmd ) ) { 223 setData.removeAll( setData2 ); 224 } 225 else if( ACT_INTERSECT.equalsIgnoreCase( actionCmd ) ) { 226 setData.retainAll( setData2 ); 227 } 228 outCount = setData.size(); 229 if( debug ) { println( setData.toString() ); } 230 paramProcess.setBulkData( setData ); 231 } 232 } 233 234 /** 235 * プロセスの終?行います??に??、呼び出されます? 236 * 終???ファイルクローズ??クローズ?に使用します? 237 * 238 * @param isOK ト?タルで、OK?たかど? [true:成功/false:失敗] 239 */ 240 public void end( final boolean isOK ) { 241 // 何もありません? 242 } 243 244 /** 245 * こ???タの処?おいて、次の処?出来るかど?を問?わせます? 246 * こ?呼び出し1回毎に、次の??タを取得する準備を行います? 247 * 248 * @return 処?きる:true / 処?きな?false 249 */ 250 public boolean next() { 251 return firstTime; 252 } 253 254 /** 255 * 引数の LineModel を??るメソ?です? 256 * 変換処?? LineModel を返します? 257 * 後続??行わな?????タのフィルタリングを行う場?は? 258 * null ??タを返します?つまり?null ??タは、後続??行わな? 259 * フラグの代わりにも使用して?す? 260 * なお?変換処?? LineModel と、オリジナルの LineModel が? 261 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す? 262 * ドキュメントに明記されて???合?、副作用が問題になる?合?? 263 * ???とに自?コピ?(クローン)して下さ?? 264 * 265 * @param data オリジナルのLineModel 266 * 267 * @return 処?換後?LineModel 268 */ 269 @SuppressWarnings(value={"unchecked"}) 270 public LineModel action( final LineModel data ) { 271 return data ; 272 } 273 274 /** 275 * ??に?行データである LineModel を作?しま? 276 * FirstProcess は、次?処?チェインして???の行データ? 277 * 作?して、後続? ChainProcess クラスに処?ータを渡します? 278 * 279 * @param rowNo 処?の行番号 280 * 281 * @return 処?換後?LineModel 282 */ 283 public LineModel makeLineModel( final int rowNo ) { 284 firstTime = false; // ?しか処?な?め?false を設定する? 285 286 LineModel model = new LineModel(); 287 288 model.setRowNo( rowNo ); 289 290 return model; 291 } 292 293 /** 294 * ?で使用する Set オブジェクトを作?します? 295 * Exception 以外では、? Set<String> オブジェクトを返します? 296 * 297 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 298 * 299 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク? 300 * @param dbid 接続?ID 301 * @param sql 実行するSQL?検索系) 302 * 303 * @return 実行結果から取り出した、最初?カラ??みを集めた Setオブジェク? 304 * @throws RuntimeException ??タベ?ス処?できなかった?合? 305 */ 306 private Set<String> createSetData( final ParamProcess paramProcess, final String dbid, final String sql ) { 307 Set<String> data = new HashSet<String>(); 308 309 Connection connection = null; 310 Statement stmt = null; 311 ResultSet resultSet = null; 312 313 try { 314 connection = paramProcess.getConnection( dbid ); 315 stmt = connection.createStatement(); 316 if( fetchSize > 0 ) { stmt.setFetchSize( fetchSize ); } 317 if( stmt.execute( sql ) ) { // true:検索系 , false:更新系 318 resultSet = stmt.getResultSet(); 319 while( resultSet.next() ) { 320 sqlCount++ ; 321 String str = resultSet.getString(1); 322 if( display ) { println( str ); } 323 data.add( str ); 324 } 325 } 326 else { 327 // sqlCount = stmt.getUpdateCount(); // 5.3.9.0 (2011/09/01) 328 sqlCount += stmt.getUpdateCount(); 329 } 330 } 331 catch (SQLException ex) { 332 String errMsg = "SQL を実行できませんでした? + CR 333 + "DBID=" + dbid + CR 334 + "SQL =" + sql ; 335 throw new RuntimeException( errMsg,ex ); 336 } 337 finally { 338 Closer.resultClose( resultSet ); 339 Closer.stmtClose( stmt ); 340 341 ConnectionFactory.remove( connection,dbid ); 342 } 343 return data; 344 } 345 346 /** 347 * ?で使用する Set オブジェクトを作?します? 348 * Exception 以外では、? Set<String[]> オブジェクトを返します? 349 * 350 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 351 * 352 * @param sql オリジナルのSQL? 353 * @param bulkKey ?処?置き換えるキー?? 354 * @param bulkType ?型(true)か?数字型(false)を指? 355 * @param setData ?処???なるSetオブジェク? 356 * 357 * @return オリジナルのSQL?に ?処????と置換したSQL??配? 358 */ 359 private String[] makeBulkQuery( final String sql, final String bulkKey, final boolean bulkType,final Set<String> setData ) { 360 String[] sqls = new String[ (setData.size()/MAX_BULK_SET) + 1 ]; 361 int idx = 0; 362 int cnt = 0; 363 364 StringBuilder buf = new StringBuilder(); 365 String bulkVal = null; 366 if( bulkType ) { // ??の場? 367 for( String key : setData ) { 368 cnt++; 369 buf.append( ",'" ).append( key ).append( "'" ); 370 if( cnt >= MAX_BULK_SET ) { 371 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 372 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 373 cnt = 0; 374 buf = new StringBuilder(); 375 } 376 } 377 if( cnt > 0 ) { // きっちりで終わらな??? 378 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 379 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 380 } 381 } 382 else { // 数字?場? 383 for( String key : setData ) { 384 cnt++; 385 buf.append( "," ).append( key ); 386 if( cnt >= MAX_BULK_SET ) { 387 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 388 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 389 cnt = 0; 390 buf = new StringBuilder(); 391 } 392 } 393 if( cnt > 0 ) { // きっちりで終わらな??? 394 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 395 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 396 } 397 } 398 // String bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 399 400 // return sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 401 return sqls; 402 } 403 404 /** 405 * プロセスの処?果のレポ?ト表現を返します? 406 * 処??ログラ?、?力件数、?力件数などの??です? 407 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ? 408 * 形式で出してください? 409 * 410 * @return 処?果のレポ?? 411 */ 412 public String report() { 413 String report = "[" + getClass().getName() + "]" + CR 414 + TAB + "Action : " + actionCmd + CR 415 + TAB + "DBID : " + dbid + CR 416 + TAB + "sqlCount : " + sqlCount + CR 417 + TAB + "setCount : " + setCount + CR 418 + TAB + "outCount : " + outCount ; 419 420 return report ; 421 } 422 423 /** 424 * こ?クラスの使用方法を返します? 425 * 426 * @return こ?クラスの使用方? 427 */ 428 public String usage() { 429 StringBuilder buf = new StringBuilder(); 430 431 buf.append( "Process_BulkQueryは、データベ?スから読み取った?容を??処?るために? ).append( CR ); 432 buf.append( "ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす? ).append( CR ); 433 buf.append( "FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです?" ).append( CR ); 434 buf.append( CR ); 435 buf.append( "こ?クラスは、上流から?下流への処???度しか実行されません? ).append( CR ); 436 buf.append( "FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します?" ).append( CR ); 437 buf.append( "ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します?" ).append( CR ); 438 buf.append( CR ); 439 buf.append( "FirstProcess では?action は、query のみです?" ).append( CR ); 440 buf.append( " query は、指定?SQL?実行し、結果のSetをParamProcessに設定します?" ).append( CR ); 441 buf.append( "ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます?" ).append( CR ); 442 buf.append( " query は、上記と同じです?" ).append( CR ); 443 buf.append( " minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します?" ).append( CR ); 444 buf.append( " intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します?" ).append( CR ); 445 buf.append( " bulkSet は、?のSetを取り?し?SQL??して処?ます?" ).append( CR ); 446 buf.append( CR ); 447 buf.append( "流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?" ).append( CR ); 448 buf.append( "bulkSet で利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?" ).append( CR ); 449 buf.append( "SQLServerのユニ?クキーをminusした結果を?ORACLEからDELETEすれば、不要な" ).append( CR ); 450 buf.append( "??タを削除するなどの処?実行可能になります?また?単純に、query ?を?" ).append( CR ); 451 buf.append( "チェインすれば、単発のUPDATE?実行することが可能です?" ).append( CR ); 452 buf.append( CR ); 453 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR ); 454 buf.append( "設定された接?Connection)を使用します?" ).append( CR ); 455 buf.append( CR ); 456 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR ); 457 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR ); 458 buf.append( "繋げてください? ).append( CR ); 459 buf.append( CR ); 460 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR ); 461 buf.append( CR ).append( CR ); 462 463 buf.append( getArgument().usage() ).append( CR ); 464 465 return buf.toString(); 466 } 467 468 /** 469 * こ?クラスは、main メソ?から実行できません? 470 * 471 * @param args コマンド引数配? 472 */ 473 public static void main( final String[] args ) { 474 LogWriter.log( new Process_BulkQuery().usage() ); 475 } 476 }