001 /* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 package org.opengion.fukurou.process; 017 018 import org.opengion.fukurou.util.Argument; 019 import org.opengion.fukurou.util.SystemParameter; 020 import org.opengion.fukurou.util.LogWriter; 021 import org.opengion.fukurou.util.HybsEntry ; 022 import org.opengion.fukurou.util.Closer; 023 import org.opengion.fukurou.model.Formatter; 024 import org.opengion.fukurou.db.ConnectionFactory; 025 026 import java.util.Map ; 027 import java.util.LinkedHashMap ; 028 029 import java.sql.Connection; 030 import java.sql.PreparedStatement; 031 import java.sql.ParameterMetaData; 032 import java.sql.SQLException; 033 034 /** 035 * Process_DBMerge は、UPDATE と INSERT を指定し ??タベ?スを追?新 036 * する、ChainProcess インターフェースの実?ラスです? 037 * 上?プロセスチェインの??タは上流から下流へと渡されます?)から 038 * 受け取っ?LineModel を?に、DBTableModel 形式ファイルを?力します? 039 * 040 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に 041 * 設定された接?Connection)を使用します? 042 * 043 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ?? 044 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に 045 * 繋げてください? 046 * 047 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます? 048 * 049 * @og.formSample 050 * Process_DBMerge -dbid=DBGE -insertTable=GE41 051 * 052 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? 053 * [ -update=検索SQL? ] ??-update="UPDATE GE41 SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] 054 * WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]" 055 * [ -updateFile=登録SQL?ァ??? ] ??-updateFile=update.sql 056 * ?? -update ?-updateFile が指定されな??合?、エラーです? 057 * [ -update_XXXX=固定? ] ??-update_SYSTEM_ID=GE 058 * SQL?の{@XXXX}??を指定?固定?で置き換えます? 059 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE' 060 * [ -insertTable=登録???゙ルID ] ??INSERT??する?合?不要?INSERT する場合???ブルID 061 * [ -insert=検索SQL? ] ??-insert="INSERT INTO GE41 (SYSTEM_ID,CLM,NAME_JA,LABEL_NAME) 062 * VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])" 063 * [ -insertFile=登録SQL?ァ??? ] ??-insertFile=insert.sql 064 * ?? -insert ?-insertFile ??-table が指定されな??合?、エラーです? 065 * [ -insert_XXXX=固定? ] ??-insert_SYSTEM_ID=GE 066 * SQL?の{@XXXX}??を指定?固定?で置き換えます? 067 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE' 068 * [ -const_XXXX=固定? ] ??-const_FGJ=1 069 * LineModel のキー(const_ に続く??)の値に、固定?を設定します? 070 * キーが異なれ?、?のカラ?を指定できます? 071 * [ -commitCnt=commit処?定] ???数毎にコミットを発行します?0 の場合?、終?でコミットしません? 072 * [ -display=false|true ] ??結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない]) 073 * 074 * @version 4.0 075 * @author Kazuhiko Hasegawa 076 * @since JDK5.0, 077 */ 078 public class Process_DBMerge extends AbstractProcess implements ChainProcess { 079 private static final String UPDATE_KEY = "update_" ; 080 private static final String INSERT_KEY = "insert_" ; 081 private static final String CNST_KEY = "const_" ; 082 083 private Connection connection = null; 084 private PreparedStatement insPstmt = null ; 085 private PreparedStatement updPstmt = null ; 086 private ParameterMetaData insPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対? 087 private ParameterMetaData updPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対? 088 private boolean useParamMetaData = false; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対? 089 090 private String dbid = null; 091 private String insert = null; 092 private String update = null; 093 private String insertTable = null; 094 private int[] insClmNos = null; // insert 時?ファイルのヘッ??のカラ?号 095 private int[] updClmNos = null; // update 時?ファイルのヘッ??のカラ?号 096 private int commitCnt = 0; // コミットするまとめ件数 097 private boolean display = false; // 表示しな? 098 099 private String[] cnstClm = null; // 固定?を設定するカラ? 100 private int[] cnstClmNos = null; // 固定?を設定するカラ?号 101 private String[] constVal = null; // カラ?号に対応した固定? 102 103 private boolean firstRow = true; // ??の?目 104 private int count = 0; 105 private int insCount = 0; 106 private int updCount = 0; 107 108 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map 109 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map 110 111 static { 112 mustProparty = new LinkedHashMap<String,String>(); 113 114 usableProparty = new LinkedHashMap<String,String>(); 115 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? ); 116 usableProparty.put( "update", "更新SQL?sql or sqlFile ??)" + 117 CR + "? \"UPDATE GE41 " + 118 CR + "SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] " + 119 CR + "WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]\"" ); 120 usableProparty.put( "updateFile", "更新SQLファイル(sql or sqlFile ??)? update.sql" ); 121 usableProparty.put( "update_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" + 122 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" ); 123 usableProparty.put( "insert", "登録SQL?sql or sqlFile ??)" + 124 CR + "? \"INSERT INTO GE41 " + 125 CR + "(SYSTEM_ID,CLM,NAME_JA,LABEL_NAME) " + 126 CR + "VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])\"" ); 127 usableProparty.put( "insertFile", "登録SQLファイル(sql or sqlFile ??)? insert.sql" ); 128 usableProparty.put( "insertTable", "INSERT する場合???ブルID SQL??する?合?不要?" ); 129 usableProparty.put( "insert_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" + 130 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" ); 131 usableProparty.put( "const_", "LineModel のキー(const_ に続く??)の値に、固定?? + 132 CR + "設定します?キーが異なれ?、?のカラ?を指定できます?" + 133 CR + "? -sql_SYSTEM_ID=GE" ); 134 usableProparty.put( "commitCnt", "?数毎にコミットを発行します?" + 135 CR + "0 の場合?、終?でコミットしません(初期値: 0)" ); 136 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? + 137 CR + "(初期値:false:表示しな?" ); 138 } 139 140 /** 141 * ?ォルトコンストラクター? 142 * こ?クラスは、動??されます??ォルトコンストラクターで? 143 * super クラスに対して、?な初期化を行っておきます? 144 * 145 */ 146 public Process_DBMerge() { 147 super( "org.opengion.fukurou.process.Process_DBMerge",mustProparty,usableProparty ); 148 } 149 150 /** 151 * プロセスの初期化を行います?初めに??、呼び出されます? 152 * 初期処?ファイルオープン??オープン?に使用します? 153 * 154 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 155 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData ?ConnectionFactory経由で取得?(PostgreSQL対? 156 * 157 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク? 158 */ 159 public void init( final ParamProcess paramProcess ) { 160 Argument arg = getArgument(); 161 162 insertTable = arg.getProparty("insertTable"); 163 update = arg.getFileProparty("update","updateFile",false); 164 insert = arg.getFileProparty("insert","insertFile",false); 165 commitCnt = arg.getProparty("commitCnt",commitCnt); 166 display = arg.getProparty("display",display); 167 168 dbid = arg.getProparty("dbid"); 169 connection = paramProcess.getConnection( dbid ); 170 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 171 // useParamMetaData = ApplicationInfo.useParameterMetaData( connection ); 172 useParamMetaData = ConnectionFactory.useParameterMetaData( dbid ); // 5.3.8.0 (2011/08/01) 173 174 if( insert == null && insertTable == null ) { 175 String errMsg = "insert また?、insertFile を指定しな??合?、insertTable を??してください?; 176 throw new RuntimeException( errMsg ); 177 } 178 179 if( insert != null && insertTable != null ) { 180 String errMsg = "insert また?、insertFile と、insertTable は、両方同時に?できません?" 181 + insert + "],[" + insertTable + "]"; 182 throw new RuntimeException( errMsg ); 183 } 184 185 // 3.8.0.1 (2005/06/17) {@DATE.XXXX} 変換処??追? 186 // {@DATE.YMDH} などの??を?yyyyMMddHHmmss 型?日付に置き換えます? 187 // SQL?? {@XXXX} ??の固定?への置き換? 188 HybsEntry[] entry =arg.getEntrys(UPDATE_KEY); // 配? 189 SystemParameter sysParam = new SystemParameter( update ); 190 update = sysParam.replace( entry ); 191 192 if( insert != null ) { 193 entry =arg.getEntrys(INSERT_KEY); // 配? 194 sysParam = new SystemParameter( insert ); 195 insert = sysParam.replace( entry ); 196 } 197 198 HybsEntry[] cnstKey = arg.getEntrys( CNST_KEY ); // 配? 199 int csize = cnstKey.length; 200 cnstClm = new String[csize]; 201 constVal = new String[csize]; 202 for( int i=0; i<csize; i++ ) { 203 cnstClm[i] = cnstKey[i].getKey(); 204 constVal[i] = cnstKey[i].getValue(); 205 } 206 } 207 208 /** 209 * プロセスの終?行います??に??、呼び出されます? 210 * 終???ファイルクローズ??クローズ?に使用します? 211 * 212 * @og.rev 4.0.0.0 (2007/11/27) commit,rollback,remove 処?追? 213 * @og.rev 5.1.2.0 (2010/01/01) insPmeta , updPmeta のクリア 214 * 215 * @param isOK ト?タルで、OK?たかど?[true:成功/false:失敗] 216 */ 217 public void end( final boolean isOK ) { 218 boolean flag1 = Closer.stmtClose( updPstmt ); 219 updPstmt = null; 220 boolean flag2 = Closer.stmtClose( insPstmt ); 221 insPstmt = null; 222 223 insPmeta = null ; // 5.1.2.0 (2010/01/01) 224 updPmeta = null ; // 5.1.2.0 (2010/01/01) 225 226 // close に失敗して?のに commit しても良??か? 227 if( isOK ) { 228 Closer.commit( connection ); 229 } 230 else { 231 Closer.rollback( connection ); 232 } 233 ConnectionFactory.remove( connection,dbid ); 234 235 if( ! flag1 ) { 236 String errMsg = "update ス??トメントをクローズ出来ません? + CR 237 + " update=[" + update + "] , commit=[" + isOK + "]" ; 238 throw new RuntimeException( errMsg ); 239 } 240 241 if( ! flag2 ) { 242 String errMsg = "insert ス??トメントをクローズ出来ません? + CR 243 + " insert=[" + insert + "] , commit=[" + isOK + "]" ; 244 throw new RuntimeException( errMsg ); 245 } 246 } 247 248 /** 249 * 引数の LineModel を??るメソ?です? 250 * 変換処?? LineModel を返します? 251 * 後続??行わな?????タのフィルタリングを行う場?は? 252 * null ??タを返します?つまり?null ??タは、後続??行わな? 253 * フラグの代わりにも使用して?す? 254 * なお?変換処?? LineModel と、オリジナルの LineModel が? 255 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す? 256 * ドキュメントに明記されて???合?、副作用が問題になる?合?? 257 * ???とに自?コピ?(クローン)して下さ?? 258 * 259 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 260 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData setNull 対?PostgreSQL対? 261 * 262 * @param data ラインモ? オリジナルのLineModel 263 * 264 * @return 処?換後?LineModel 265 */ 266 public LineModel action( final LineModel data ) { 267 count++ ; 268 int updCnt = 0; 269 try { 270 if( firstRow ) { 271 makePrepareStatement( insertTable,data ); 272 273 int size = cnstClm.length; 274 cnstClmNos = new int[size]; 275 for( int i=0; i<size; i++ ) { 276 cnstClmNos[i] = data.getColumnNo( cnstClm[i] ); 277 } 278 279 firstRow = false; 280 } 281 282 // 固定?置き換え?? 283 for( int j=0; j<cnstClmNos.length; j++ ) { 284 data.setValue( cnstClmNos[j],constVal[j] ); 285 } 286 287 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 288 if( useParamMetaData ) { 289 for( int i=0; i<updClmNos.length; i++ ) { 290 int type = updPmeta.getParameterType( i+1 ); 291 // 5.3.8.0 (2011/08/01) setNull 対? 292 // updPstmt.setObject( i+1,data.getValue(updClmNos[i]),type ); 293 Object val = data.getValue(updClmNos[i]); 294 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) { 295 updPstmt.setNull( i+1, type ); 296 } 297 else { 298 updPstmt.setObject( i+1, val, type ); 299 } 300 } 301 } 302 else { 303 for( int i=0; i<updClmNos.length; i++ ) { 304 updPstmt.setObject( i+1,data.getValue(updClmNos[i]) ); 305 } 306 } 307 308 updCnt = updPstmt.executeUpdate(); 309 if( updCnt == 0 ) { 310 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 311 if( useParamMetaData ) { 312 for( int i=0; i<insClmNos.length; i++ ) { 313 int type = insPmeta.getParameterType( i+1 ); 314 // 5.3.8.0 (2011/08/01) setNull 対? 315 // insPstmt.setObject( i+1,data.getValue(insClmNos[i]),type ); 316 Object val = data.getValue(insClmNos[i]); 317 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) { 318 insPstmt.setNull( i+1, type ); 319 } 320 else { 321 insPstmt.setObject( i+1, val, type ); 322 } 323 } 324 } 325 else { 326 for( int i=0; i<insClmNos.length; i++ ) { 327 insPstmt.setObject( i+1,data.getValue(insClmNos[i]) ); 328 } 329 } 330 int insCnt = insPstmt.executeUpdate(); 331 if( insCnt == 0 ) { 332 String errMsg = "?件も追?れませんでした? + CR 333 + " insert=[" + insert + "]" + CR 334 + "[" + data.getRowNo() + "]件目" + CR ; 335 throw new RuntimeException( errMsg ); 336 } 337 insCount++ ; 338 } 339 else if( updCnt > 1 ) { 340 String errMsg = "?行が同時に更新されました?" + updCnt + "]件" + CR 341 + " update=[" + update + "]" + CR 342 + "[" + data.getRowNo() + "]件目" + CR ; 343 throw new RuntimeException( errMsg ); 344 } 345 else { 346 updCount ++ ; 347 } 348 349 if( commitCnt > 0 && ( count%commitCnt == 0 ) ) { 350 Closer.commit( connection ); 351 } 352 if( display ) { printKey( count,updCnt,data ); } 353 } 354 catch (SQLException ex) { 355 String errMsg = "登録処?エラーが発生しました? + CR 356 + ((updCnt == 1) ? 357 " update=[" + update + "]" 358 : " insert=[" + insert + "]" + CR 359 + " insertTable=[" + insertTable + "]" ) 360 + CR 361 + "[" + data.getRowNo() + "]件目" + CR 362 + "errorCode=[" + ex.getErrorCode() + "] State=[" 363 + ex.getSQLState() + "]" + CR ; 364 throw new RuntimeException( errMsg,ex ); 365 } 366 return data; 367 } 368 369 /** 370 * ?で使用する PreparedStatement を作?します? 371 * 引数?? SQL また?、LineModel から作?した SQL より構築します? 372 * 373 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 374 * 375 * @param table 処?象の??ブルID 376 * @param data ラインモ? 処?象のLineModel 377 */ 378 private void makePrepareStatement( final String table,final LineModel data ) { 379 if( insert == null ) { 380 StringBuilder buf = new StringBuilder(); 381 String[] names = data.getNames(); 382 int size = names.length; 383 384 buf.append( "INSERT INTO " ).append( table ).append( " (" ); 385 buf.append( names[0] ); 386 for( int i=1; i<size; i++ ) { 387 buf.append( "," ).append( names[i] ); 388 } 389 buf.append( " ) VALUES ( ?" ); 390 for( int i=1; i<size; i++ ) { 391 buf.append( ",?" ); 392 } 393 buf.append( " )" ); 394 insert = buf.toString(); 395 396 // カラ?号を設定します? 397 insClmNos = new int[size]; 398 for( int i=0; i<size; i++ ) { 399 insClmNos[i] = i; 400 } 401 } 402 else { 403 Formatter format = new Formatter( data ); 404 format.setFormat( insert ); 405 insert = format.getQueryFormatString(); 406 insClmNos = format.getClmNos(); 407 } 408 409 Formatter format = new Formatter( data ); 410 format.setFormat( update ); 411 update = format.getQueryFormatString(); 412 updClmNos = format.getClmNos(); 413 414 try { 415 insPstmt = connection.prepareStatement( insert ); 416 updPstmt = connection.prepareStatement( update ); 417 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対? 418 if( useParamMetaData ) { 419 insPmeta = insPstmt.getParameterMetaData(); 420 updPmeta = updPstmt.getParameterMetaData(); 421 } 422 } 423 catch (SQLException ex) { 424 String errMsg = "PreparedStatement を取得できませんでした? + CR 425 + "insert=[" + insert + "]" + CR 426 + "update=[" + update + "]" + CR 427 + "table=[" + table + "]" + CR 428 + "nameLine=[" + data.nameLine() + "]" ; 429 throw new RuntimeException( errMsg,ex ); 430 } 431 } 432 433 /** 434 * 画面出力用のフォーマットを作?します? 435 * 436 * @param rowNo ??タ読み取り件数 437 * @param updCnt 更新件数 438 * @param data ラインモ? 439 */ 440 private void printKey( final int rowNo , final int updCnt , final LineModel data ) { 441 StringBuilder buf = new StringBuilder(); 442 443 if( updCnt > 0 ) { buf.append( "UPDATE " ); } 444 else { buf.append( "INSERT " ); } 445 446 buf.append( "row=[" ).append( rowNo ).append( "] : " ); 447 for( int i=0; i < updClmNos.length; i++ ) { 448 if( i == 0 ) { buf.append( "key: " ); } 449 else { buf.append( " and " ); } 450 buf.append( data.getName( updClmNos[i] ) ); 451 buf.append( " = " ); 452 buf.append( data.getValue( updClmNos[i] ) ); 453 } 454 455 println( buf.toString() ); 456 } 457 458 /** 459 * プロセスの処?果のレポ?ト表現を返します? 460 * 処??ログラ?、?力件数、?力件数などの??です? 461 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ? 462 * 形式で出してください? 463 * 464 * @return 処?果のレポ?? 465 */ 466 public String report() { 467 String report = "[" + getClass().getName() + "]" + CR 468 + TAB + "DBID : " + dbid + CR 469 + TAB + "Input Count : " + count + CR 470 + TAB + "Update Count : " + updCount + CR 471 + TAB + "Insert Count : " + insCount ; 472 473 return report ; 474 } 475 476 /** 477 * こ?クラスの使用方法を返します? 478 * 479 * @return こ?クラスの使用方? 480 */ 481 public String usage() { 482 StringBuilder buf = new StringBuilder(); 483 484 buf.append( "Process_DBMerge は、UPDATE と INSERT を指定し ??タベ?スを追?新" ).append( CR ); 485 buf.append( "する、ChainProcess インターフェースの実?ラスです?" ).append( CR ); 486 buf.append( "上?プロセスチェインの??タは上流から下流へと渡されます?)から" ).append( CR ); 487 buf.append( "受け取っ?LineModel を?に、データベ?スの存在チェ?を行い? ).append( CR ); 488 buf.append( "下流への処?振り?けます?" ).append( CR ); 489 buf.append( CR ); 490 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR ); 491 buf.append( "設定された接?Connection)を使用します?" ).append( CR ); 492 buf.append( CR ); 493 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR ); 494 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR ); 495 buf.append( "繋げてください? ).append( CR ); 496 buf.append( CR ); 497 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR ); 498 buf.append( CR ).append( CR ); 499 buf.append( getArgument().usage() ).append( CR ); 500 501 return buf.toString(); 502 } 503 504 /** 505 * こ?クラスは、main メソ?から実行できません? 506 * 507 * @param args コマンド引数配? 508 */ 509 public static void main( final String[] args ) { 510 LogWriter.log( new Process_DBMerge().usage() ); 511 } 512 }