001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.fukurou.process; 017 018import org.opengion.fukurou.util.Argument; 019import org.opengion.fukurou.util.SystemParameter; 020import org.opengion.fukurou.util.LogWriter; 021import org.opengion.fukurou.util.HybsEntry ; 022import org.opengion.fukurou.util.Closer; 023import org.opengion.fukurou.model.Formatter; 024import org.opengion.fukurou.db.ConnectionFactory; 025 026import java.util.Map ; 027import java.util.LinkedHashMap ; 028 029import java.sql.Connection; 030import java.sql.PreparedStatement; 031import java.sql.ParameterMetaData; 032import java.sql.SQLException; 033 034/** 035 * Process_DBMerge は、UPDATE と INSERT を指定し データベースを追加更新 036 * する、ChainProcess インターフェースの実装クラスです。 037 * 上流(プロセスチェインのデータは上流から下流へと渡されます。)から 038 * 受け取った LineModel を元に、DBTableModel 形式ファイルを出力します。 039 * 040 * データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に 041 * 設定された接続(Connection)を使用します。 042 * 043 * 引数文字列中にスペースを含む場合は、ダブルコーテーション("") で括って下さい。 044 * 引数文字列の 『=』の前後には、スペースは挟めません。必ず、-key=value の様に 045 * 繋げてください。 046 * 047 * SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。 048 * 049 * @og.formSample 050 * Process_DBMerge -dbid=DBGE -insertTable=GE41 051 * 052 * [ -dbid=DB接続ID ] : -dbid=DBGE (例: Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定) 053 * [ -update=検索SQL文 ] : -update="UPDATE GE41 SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] 054 * WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]" 055 * [ -updateFile=登録SQLファイル ] : -updateFile=update.sql 056 * : -update や -updateFile が指定されない場合は、エラーです。 057 * [ -update_XXXX=固定値 ] : -update_SYSTEM_ID=GE 058 * SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。 059 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE' 060 * [ -insertTable=登録テーブルID ] : INSERT文を指定する場合は不要。INSERT する場合のテーブルID 061 * [ -insert=検索SQL文 ] : -insert="INSERT INTO GE41 (SYSTEM_ID,CLM,NAME_JA,LABEL_NAME) 062 * VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])" 063 * [ -insertFile=登録SQLファイル ] : -insertFile=insert.sql 064 * : -insert や -insertFile や、-table が指定されない場合は、エラーです。 065 * [ -insert_XXXX=固定値 ] : -insert_SYSTEM_ID=GE 066 * SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。 067 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE' 068 * [ -const_XXXX=固定値 ] : -const_FGJ=1 069 * LineModel のキー(const_ に続く文字列)の値に、固定値を設定します。 070 * キーが異なれば、複数のカラム名を指定できます。 071 * [ -commitCnt=commit処理指定] : 指定数毎にコミットを発行します。0 の場合は、終了までコミットしません。 072 * [ -display=[false/true] ] : 結果を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない]) 073 * [ -debug=[false/true] ] : デバッグ情報を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない]) 074 * 075 * @version 4.0 076 * @author Kazuhiko Hasegawa 077 * @since JDK5.0, 078 */ 079public class Process_DBMerge extends AbstractProcess implements ChainProcess { 080 private static final String UPDATE_KEY = "update_" ; 081 private static final String INSERT_KEY = "insert_" ; 082 private static final String CNST_KEY = "const_" ; 083 084 private Connection connection = null; 085 private PreparedStatement insPstmt = null ; 086 private PreparedStatement updPstmt = null ; 087 private ParameterMetaData insPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す。(PostgreSQL対応) 088 private ParameterMetaData updPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す。(PostgreSQL対応) 089 private boolean useParamMetaData = false; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す。(PostgreSQL対応) 090 091 private String dbid = null; 092 private String insert = null; 093 private String update = null; 094 private String insertTable = null; 095 private int[] insClmNos = null; // insert 時のファイルのヘッダーのカラム番号 096 private int[] updClmNos = null; // update 時のファイルのヘッダーのカラム番号 097 private int commitCnt = 0; // コミットするまとめ件数 098 private boolean display = false; // 表示しない 099 private boolean debug = false; // 5.7.3.0 (2014/02/07) デバッグ情報 100 101 private String[] cnstClm = null; // 固定値を設定するカラム名 102 private int[] cnstClmNos = null; // 固定値を設定するカラム番号 103 private String[] constVal = null; // カラム番号に対応した固定値 104 105 private boolean firstRow = true; // 最初の一行目 106 private int count = 0; 107 private int insCount = 0; 108 private int updCount = 0; 109 110 private static final Map<String,String> mustProparty ; // [プロパティ]必須チェック用 Map 111 private static final Map<String,String> usableProparty ; // [プロパティ]整合性チェック Map 112 113 static { 114 mustProparty = new LinkedHashMap<String,String>(); 115 116 usableProparty = new LinkedHashMap<String,String>(); 117 usableProparty.put( "dbid", "Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定" ); 118 usableProparty.put( "update", "更新SQL文(sql or sqlFile 必須)" + 119 CR + "例: \"UPDATE GE41 " + 120 CR + "SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] " + 121 CR + "WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]\"" ); 122 usableProparty.put( "updateFile", "更新SQLファイル(sql or sqlFile 必須)例: update.sql" ); 123 usableProparty.put( "update_", "SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。" + 124 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" ); 125 usableProparty.put( "insert", "登録SQL文(sql or sqlFile 必須)" + 126 CR + "例: \"INSERT INTO GE41 " + 127 CR + "(SYSTEM_ID,CLM,NAME_JA,LABEL_NAME) " + 128 CR + "VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])\"" ); 129 usableProparty.put( "insertFile", "登録SQLファイル(sql or sqlFile 必須)例: insert.sql" ); 130 usableProparty.put( "insertTable", "INSERT する場合のテーブルID SQL文を指定する場合は不要。" ); 131 usableProparty.put( "insert_", "SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。" + 132 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" ); 133 usableProparty.put( "const_", "LineModel のキー(const_ に続く文字列)の値に、固定値を" + 134 CR + "設定します。キーが異なれば、複数のカラム名を指定できます。" + 135 CR + "例: -sql_SYSTEM_ID=GE" ); 136 usableProparty.put( "commitCnt", "指定数毎にコミットを発行します。" + 137 CR + "0 の場合は、終了までコミットしません(初期値: 0)" ); 138 usableProparty.put( "display", "結果を標準出力に表示する(true)かしない(false)か" + 139 CR + "(初期値:false:表示しない)" ); 140 usableProparty.put( "debug", "デバッグ情報を標準出力に表示する(true)かしない(false)か" + 141 CR + "(初期値:false:表示しない)" ); // 5.7.3.0 (2014/02/07) デバッグ情報 142 } 143 144 /** 145 * デフォルトコンストラクター。 146 * このクラスは、動的作成されます。デフォルトコンストラクターで、 147 * super クラスに対して、必要な初期化を行っておきます。 148 * 149 */ 150 public Process_DBMerge() { 151 super( "org.opengion.fukurou.process.Process_DBMerge",mustProparty,usableProparty ); 152 } 153 154 /** 155 * プロセスの初期化を行います。初めに一度だけ、呼び出されます。 156 * 初期処理(ファイルオープン、DBオープン等)に使用します。 157 * 158 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す。(PostgreSQL対応) 159 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData を ConnectionFactory経由で取得。(PostgreSQL対応) 160 * 161 * @param paramProcess データベースの接続先情報などを持っているオブジェクト 162 */ 163 public void init( final ParamProcess paramProcess ) { 164 Argument arg = getArgument(); 165 166 insertTable = arg.getProparty("insertTable"); 167 update = arg.getFileProparty("update","updateFile",false); 168 insert = arg.getFileProparty("insert","insertFile",false); 169 commitCnt = arg.getProparty("commitCnt",commitCnt); 170 display = arg.getProparty("display",display); 171 debug = arg.getProparty("debug",debug); // 5.7.3.0 (2014/02/07) デバッグ情報 172 173 dbid = arg.getProparty("dbid"); 174 connection = paramProcess.getConnection( dbid ); 175 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す。(PostgreSQL対応) 176 useParamMetaData = ConnectionFactory.useParameterMetaData( dbid ); // 5.3.8.0 (2011/08/01) 177 178 if( insert == null && insertTable == null ) { 179 String errMsg = "insert または、insertFile を指定しない場合は、insertTable を必ず指定してください。"; 180 throw new RuntimeException( errMsg ); 181 } 182 183 if( insert != null && insertTable != null ) { 184 String errMsg = "insert または、insertFile と、insertTable は、両方同時に指定できません。[" 185 + insert + "],[" + insertTable + "]"; 186 throw new RuntimeException( errMsg ); 187 } 188 189 // 3.8.0.1 (2005/06/17) {@DATE.XXXX} 変換処理の追加 190 // {@DATE.YMDH} などの文字列を、yyyyMMddHHmmss 型の日付に置き換えます。 191 // SQL文の {@XXXX} 文字列の固定値への置き換え 192 HybsEntry[] entry =arg.getEntrys(UPDATE_KEY); // 配列 193 SystemParameter sysParam = new SystemParameter( update ); 194 update = sysParam.replace( entry ); 195 196 if( insert != null ) { 197 entry =arg.getEntrys(INSERT_KEY); // 配列 198 sysParam = new SystemParameter( insert ); 199 insert = sysParam.replace( entry ); 200 } 201 202 HybsEntry[] cnstKey = arg.getEntrys( CNST_KEY ); // 配列 203 int csize = cnstKey.length; 204 cnstClm = new String[csize]; 205 constVal = new String[csize]; 206 for( int i=0; i<csize; i++ ) { 207 cnstClm[i] = cnstKey[i].getKey(); 208 constVal[i] = cnstKey[i].getValue(); 209 } 210 } 211 212 /** 213 * プロセスの終了を行います。最後に一度だけ、呼び出されます。 214 * 終了処理(ファイルクローズ、DBクローズ等)に使用します。 215 * 216 * @og.rev 4.0.0.0 (2007/11/27) commit,rollback,remove 処理を追加 217 * @og.rev 5.1.2.0 (2010/01/01) insPmeta , updPmeta のクリア 218 * 219 * @param isOK トータルで、OKだったかどうか[true:成功/false:失敗] 220 */ 221 public void end( final boolean isOK ) { 222 boolean flag1 = Closer.stmtClose( updPstmt ); 223 updPstmt = null; 224 boolean flag2 = Closer.stmtClose( insPstmt ); 225 insPstmt = null; 226 227 insPmeta = null ; // 5.1.2.0 (2010/01/01) 228 updPmeta = null ; // 5.1.2.0 (2010/01/01) 229 230 // close に失敗しているのに commit しても良いのか? 231 if( isOK ) { 232 Closer.commit( connection ); 233 } 234 else { 235 Closer.rollback( connection ); 236 } 237 ConnectionFactory.remove( connection,dbid ); 238 239 if( ! flag1 ) { 240 String errMsg = "update ステートメントをクローズ出来ません。" + CR 241 + " update=[" + update + "] , commit=[" + isOK + "]" ; 242 throw new RuntimeException( errMsg ); 243 } 244 245 if( ! flag2 ) { 246 String errMsg = "insert ステートメントをクローズ出来ません。" + CR 247 + " insert=[" + insert + "] , commit=[" + isOK + "]" ; 248 throw new RuntimeException( errMsg ); 249 } 250 } 251 252 /** 253 * 引数の LineModel を処理するメソッドです。 254 * 変換処理後の LineModel を返します。 255 * 後続処理を行わない場合(データのフィルタリングを行う場合)は、 256 * null データを返します。つまり、null データは、後続処理を行わない 257 * フラグの代わりにも使用しています。 258 * なお、変換処理後の LineModel と、オリジナルの LineModel が、 259 * 同一か、コピー(クローン)かは、各処理メソッド内で決めています。 260 * ドキュメントに明記されていない場合は、副作用が問題になる場合は、 261 * 各処理ごとに自分でコピー(クローン)して下さい。 262 * 263 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す。(PostgreSQL対応) 264 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData setNull 対応(PostgreSQL対応) 265 * @og.rev 5.7.2.2 (2014/01/24) SQL実行エラーを少し詳細に出力します。 266 * 267 * @param data ラインモデル オリジナルのLineModel 268 * 269 * @return 処理変換後のLineModel 270 */ 271 public LineModel action( final LineModel data ) { 272 count++ ; 273 int updCnt = 0; 274 try { 275 if( firstRow ) { 276 makePrepareStatement( insertTable,data ); 277 278 int size = cnstClm.length; 279 cnstClmNos = new int[size]; 280 for( int i=0; i<size; i++ ) { 281 cnstClmNos[i] = data.getColumnNo( cnstClm[i] ); 282 } 283 284 firstRow = false; 285 if( display ) { println( data.nameLine() ); } // 5.7.3.0 (2014/02/07) デバッグ情報 286 } 287 288 // 固定値置き換え処理 289 for( int j=0; j<cnstClmNos.length; j++ ) { 290 data.setValue( cnstClmNos[j],constVal[j] ); 291 } 292 293 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す。(PostgreSQL対応) 294 if( useParamMetaData ) { 295 for( int i=0; i<updClmNos.length; i++ ) { 296 int type = updPmeta.getParameterType( i+1 ); 297 // 5.3.8.0 (2011/08/01) setNull 対応 298 Object val = data.getValue(updClmNos[i]); 299 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) { 300 updPstmt.setNull( i+1, type ); 301 } 302 else { 303 updPstmt.setObject( i+1, val, type ); 304 } 305 } 306 } 307 else { 308 for( int i=0; i<updClmNos.length; i++ ) { 309 updPstmt.setObject( i+1,data.getValue(updClmNos[i]) ); 310 } 311 } 312 313 updCnt = updPstmt.executeUpdate(); 314 if( updCnt == 0 ) { 315 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す。(PostgreSQL対応) 316 if( useParamMetaData ) { 317 for( int i=0; i<insClmNos.length; i++ ) { 318 int type = insPmeta.getParameterType( i+1 ); 319 // 5.3.8.0 (2011/08/01) setNull 対応 320 Object val = data.getValue(insClmNos[i]); 321 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) { 322 insPstmt.setNull( i+1, type ); 323 } 324 else { 325 insPstmt.setObject( i+1, val, type ); 326 } 327 } 328 } 329 else { 330 for( int i=0; i<insClmNos.length; i++ ) { 331 insPstmt.setObject( i+1,data.getValue(insClmNos[i]) ); 332 } 333 } 334 int insCnt = insPstmt.executeUpdate(); 335 if( insCnt == 0 ) { 336 String errMsg = "1件も追加されませんでした。[" + data.getRowNo() + "]件目" + CR 337 + " insert=[" + insert + "]" + CR 338 + " data=[" + data.dataLine() + "]" + CR ; // 5.7.2.2 (2014/01/24) エラー時にデータも出力します。 339 throw new RuntimeException( errMsg ); 340 } 341 insCount++ ; 342 } 343 else if( updCnt > 1 ) { 344 String errMsg = "複数行(" + updCnt + ")が同時に更新されました。[" + data.getRowNo() + "]件目" + CR 345 + " update=[" + update + "]" + CR 346 + " data=[" + data.dataLine() + "]" + CR ; // 5.7.2.2 (2014/01/24) エラー時にデータも出力します。 347 throw new RuntimeException( errMsg ); 348 } 349 else { 350 updCount ++ ; 351 } 352 353 if( commitCnt > 0 && ( count%commitCnt == 0 ) ) { 354 Closer.commit( connection ); 355 } 356 if( display ) { println( data.dataLine() ); } // 5.7.3.0 (2014/02/07) デバッグ情報 357 } 358 catch (SQLException ex) { 359 String errMsg = "登録処理でエラーが発生しました。[" + data.getRowNo() + "]件目" + CR 360 + ((updCnt == 1) ? 361 " update=[" + update + "]" 362 : " insert=[" + insert + "]" + CR 363 + " insertTable=[" + insertTable + "]" ) 364 + CR 365 + "errCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR 366 + "data=[" + data.dataLine() + "]" + CR ; // 5.7.2.2 (2014/01/24) エラー時にデータも出力します。 367 throw new RuntimeException( errMsg,ex ); 368 } 369 return data; 370 } 371 372 /** 373 * 内部で使用する PreparedStatement を作成します。 374 * 引数指定の SQL または、LineModel から作成した SQL より構築します。 375 * 376 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す。(PostgreSQL対応) 377 * @og.rev 5.7.2.2 (2014/01/24) SQL実行エラーを少し詳細に出力します。 378 * 379 * @param table 処理対象のテーブルID 380 * @param data ラインモデル 処理対象のLineModel 381 */ 382 private void makePrepareStatement( final String table,final LineModel data ) { 383 if( insert == null ) { 384 StringBuilder buf = new StringBuilder(); 385 String[] names = data.getNames(); 386 int size = names.length; 387 388 buf.append( "INSERT INTO " ).append( table ).append( " (" ); 389 buf.append( names[0] ); 390 for( int i=1; i<size; i++ ) { 391 buf.append( "," ).append( names[i] ); 392 } 393 buf.append( " ) VALUES ( ?" ); 394 for( int i=1; i<size; i++ ) { 395 buf.append( ",?" ); 396 } 397 buf.append( " )" ); 398 insert = buf.toString(); 399 400 // カラム番号を設定します。 401 insClmNos = new int[size]; 402 for( int i=0; i<size; i++ ) { 403 insClmNos[i] = i; 404 } 405 } 406 else { 407 Formatter format = new Formatter( data ); 408 format.setFormat( insert ); 409 insert = format.getQueryFormatString(); 410 insClmNos = format.getClmNos(); 411 } 412 413 Formatter format = new Formatter( data ); 414 format.setFormat( update ); 415 update = format.getQueryFormatString(); 416 updClmNos = format.getClmNos(); 417 418 try { 419 insPstmt = connection.prepareStatement( insert ); 420 updPstmt = connection.prepareStatement( update ); 421 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す。(PostgreSQL対応) 422 if( useParamMetaData ) { 423 insPmeta = insPstmt.getParameterMetaData(); 424 updPmeta = updPstmt.getParameterMetaData(); 425 } 426 } 427 catch (SQLException ex) { 428 // 5.7.2.2 (2014/01/24) SQL実行エラーを少し詳細に出力します。 429 String errMsg = "PreparedStatement を取得できませんでした。" + CR 430 + "errMsg=[" + ex.getMessage() + "]" + CR 431 + "errCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR 432 + "insert=[" + insert + "]" + CR 433 + "update=[" + update + "]" + CR 434 + "table=[" + table + "]" + CR 435 + "nameLine=[" + data.nameLine() + "]" + CR 436 + "data=[" + data.dataLine() + "]" + CR ; 437 throw new RuntimeException( errMsg,ex ); 438 } 439 } 440 441 /** 442 * プロセスの処理結果のレポート表現を返します。 443 * 処理プログラム名、入力件数、出力件数などの情報です。 444 * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような 445 * 形式で出してください。 446 * 447 * @return 処理結果のレポート 448 */ 449 public String report() { 450 String report = "[" + getClass().getName() + "]" + CR 451 + TAB + "DBID : " + dbid + CR 452 + TAB + "Input Count : " + count + CR 453 + TAB + "Update Count : " + updCount + CR 454 + TAB + "Insert Count : " + insCount ; 455 456 return report ; 457 } 458 459 /** 460 * このクラスの使用方法を返します。 461 * 462 * @return このクラスの使用方法 463 */ 464 public String usage() { 465 StringBuilder buf = new StringBuilder(); 466 467 buf.append( "Process_DBMerge は、UPDATE と INSERT を指定し データベースを追加更新" ).append( CR ); 468 buf.append( "する、ChainProcess インターフェースの実装クラスです。" ).append( CR ); 469 buf.append( "上流(プロセスチェインのデータは上流から下流へと渡されます。)から" ).append( CR ); 470 buf.append( "受け取った LineModel を元に、データベースの存在チェックを行い、" ).append( CR ); 471 buf.append( "下流への処理を振り分けます。" ).append( CR ); 472 buf.append( CR ); 473 buf.append( "データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR ); 474 buf.append( "設定された接続(Connection)を使用します。" ).append( CR ); 475 buf.append( CR ); 476 buf.append( "引数文字列中に空白を含む場合は、ダブルコーテーション(\"\") で括って下さい。" ).append( CR ); 477 buf.append( "引数文字列の 『=』の前後には、空白は挟めません。必ず、-key=value の様に" ).append( CR ); 478 buf.append( "繋げてください。" ).append( CR ); 479 buf.append( CR ); 480 buf.append( "SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。" ).append( CR ); 481 buf.append( CR ).append( CR ); 482 buf.append( getArgument().usage() ).append( CR ); 483 484 return buf.toString(); 485 } 486 487 /** 488 * このクラスは、main メソッドから実行できません。 489 * 490 * @param args コマンド引数配列 491 */ 492 public static void main( final String[] args ) { 493 LogWriter.log( new Process_DBMerge().usage() ); 494 } 495}