001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.hayabusa.report2; 017 018import java.io.File; 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.List; 022 023import org.opengion.fukurou.model.FileOperation; 024import org.opengion.fukurou.util.FileUtil; 025import org.opengion.fukurou.util.StringUtil; 026import org.opengion.hayabusa.common.HybsSystem; 027import org.opengion.hayabusa.io.HybsFileOperationFactory; 028 029/** 030 * 帳票要求スレッドの本体です。 031 * 外部からスタックされたキューを先入れ先出しの順番に処理します。 032 * 033 * あるキューに対してエラーが発生すると、システムリソースのRETRY_COUNTで設定された回数再処理を試みます。 034 * この回数分エラーが発生した場合は、そのキューのみがアプリエラーとなります。 035 * 036 * このスレッドは一度生成されると、外部から明示的に終了の要求を起こさない限り生存し続けます。 037 * 終了するには、finish()メソッドを呼び出します。 038 * このメソッドが呼ばれると、内部でスタックしているキューは全てクリアされるため、その時点で 039 * 処理されているキューの処理が完了した時点で、スレッドが終了します。 040 * 041 * @og.group 帳票システム 042 * 043 * @version 4.0 044 * @author Hiroki.Nakamura 045 * @since JDK1.6 046 */ 047public class ExecThread extends Thread { 048 049 private static enum Status { EXECUTE, WAIT }; 050 private Status state = Status.EXECUTE; 051 052 private static final int RETRY_COUNT = HybsSystem.sysInt( "REPORT_RETRY_COUNT" ); 053 054 private final List<ExecQueue> queues = Collections.synchronizedList( new ArrayList<ExecQueue>() ); 055 056 private long threadStart = 0; 057 private long execStart = 0; 058 private long execEnd = 0; 059 private final boolean debug; // 4.3.0.0 (2008/07/15) デバッグの追加 060 061 /** 062 * コンストラクタ 063 * OOoへの接続を生成します。 064 * 065 * @param id スレッドID 066 */ 067 public ExecThread( final String id ) { 068 // threadStart = System.currentTimeMillis(); 069 // setName( id ); // スタックトレース時にスレッドIDを出すためにセット 070 this ( id , false ); 071 } 072 073 /** 074 * コンストラクタ 075 * OOoへの接続を生成します。 076 * 077 * @og.rev 4.3.0.0 (2008/07/15) デバッグフラグを追加します。 078 * @param id スレッドID 079 * @param debugFlag デバッグフラグ[true/false] 080 */ 081 public ExecThread( final String id , final boolean debugFlag ) { 082 threadStart = System.currentTimeMillis(); 083 setName( id ); // スタックトレース時にスレッドIDを出すためにセット 084 debug = debugFlag; // 4.2.5.0 (2008/06/26) デバッグ処理の追加 085 } 086 087 /** 088 * キューをスタックします。 089 * 090 * @og.rev 4.3.0.0 (2008/07/15) debug追加 091 * @param queue ExecQueueオブジェクト 092 * 093 * @return スタックが受け付けられたかどうか 094 */ 095 public boolean stackQueue( final ExecQueue queue ) { 096 queue.addMsg( "[INFO]QUEUE STACK:THREAD-ID=" + queue.getThreadId() + ",YKNO=" + queue.getYkno() + HybsSystem.CR ); 097 098 queues.add( queue ); 099 100 queue.setExecute(); 101 if( debug ) { queue.addMsg( "[INFO]QUEUE STACKED" + HybsSystem.CR ); } 102 103 synchronized( this ) { 104 if( state == Status.WAIT ) { 105 this.interrupt(); 106 if( debug ) { queue.addMsg( "[INFO]INTERRUPT" + HybsSystem.CR ); } 107 } 108 } 109 return true; 110 } 111 112 /** 113 * スレッド本体 114 * スタックされたキューを順番に取り出し処理を行います。 115 */ 116 @Override 117 public void run() { 118 119 while( true ) { 120 121 synchronized( this ) { 122 while( queues.isEmpty() ) { 123 try { 124 state = Status.WAIT; 125 wait(); 126 } 127 catch( InterruptedException ex ) { 128 state = Status.EXECUTE; 129 } 130 } 131 } 132 133 ExecQueue queue = popQueue(); 134 if( queue != null ) { 135 if( "_FINALIZE".equals( queue.getYkno() ) ) { 136 if( debug ) { queue.addMsg( "[INFO]END" + HybsSystem.CR ); } 137 break; 138 } 139 else { 140 if( debug ) { queue.addMsg( "[INFO]QUEUE START" + HybsSystem.CR ); } 141 exec( queue ); 142 143 // oota tmp 144 FileOperation file = HybsFileOperationFactory.create(queue.getStorageType(), queue.getBucketName(), queue.getOutputName()); 145 if(!file.isLocal()) { 146 File localFile = new File(queue.getOutputName()); 147 FileUtil.copy(localFile, file); 148 localFile.delete(); 149 } 150 151 // System.out.println( queue.getMsg() ); 152 System.out.print( queue.getMsg() ); // 4.3.0.0 (2008/07/15) 153 } 154 } 155 } 156 } 157 158 /** 159 * スレッドを終了させるためのキューを追加します。 160 * 161 * このメソッドが呼ばれると、内部にスタックしているキューは全てクリアされます。 162 */ 163 public void finish() { 164 queues.clear(); 165 166 ExecQueue qu = new ExecQueue(); 167 qu.setYkno( "_FINALIZE" ); 168 stackQueue( qu ); 169 } 170 171 /** 172 * スレッドを終了させるためのキューを追加します。 173 * 174 * このメソッドでは、既にスタックされているキューはクリアされず、全て処理された後で、 175 * スレッドを終了します。 176 * 177 * @og.rev 5.1.6.0 (2010/05/01) 新規作成 178 */ 179 public void finishAfterExec() { 180 ExecQueue qu = new ExecQueue(); 181 qu.setYkno( "_FINALIZE" ); 182 stackQueue( qu ); 183 } 184 185// /** 186// * 現在処理しているキューの処理時間を返します。 187// * スレッドがWAIT状態の場合は、0を返します。 188// * 189// * @return 処理時間 190// */ 191// public int getExecTime() { 192// return ( execStart > execEnd ? (int)(System.currentTimeMillis() - execStart) : 0 ); 193// } 194 195 /** 196 * 帳票処理を行います。 197 * 198 * @og.rev 5.1.2.0 (2010/01/01) 256シートを超えた場合でも、正しく処理できるように対応 199 * 200 * @param queue ExecQueueオブジェクト 201 */ 202 private void exec( final ExecQueue queue ) { 203 execStart = System.currentTimeMillis(); 204 205 ExecProcess oep = new ExecProcess( queue, debug ); 206 for( int i = 0; i <= RETRY_COUNT; i++ ) { 207 try { 208 // 5.1.2.0 (2010/01/01) データが終わるまで処理を継続する。 209 while( !queue.isEnd() ) { 210 oep.process(); 211 } 212 queue.setComplete(); 213 break; 214 } 215 catch( Throwable th ) { 216 queue.addMsg( "[ERROR]ERROR OCCURRED!" + HybsSystem.CR ); 217 queue.addMsg( StringUtil.stringStackTrace( th ) ); 218 219 if( i == RETRY_COUNT ) { 220 queue.addMsg( "[ERROR]UPTO RETRY COUNT!" + HybsSystem.CR ); 221 queue.setError(); 222 } 223 } 224 } 225 226 execEnd = System.currentTimeMillis(); 227 } 228 229 /** 230 * キューを取り出します。 231 * 232 * @return キュー 233 */ 234 private ExecQueue popQueue() { 235 return queues.remove( 0 ); 236 } 237 238 /** 239 * このクラスの文字列表現を返します。 240 * 241 * @og.rev 4.3.0.0 (2008/07/15) debugを追加 242 * 243 * @return 文字列表現 244 */ 245 @Override 246 public String toString() { 247 StringBuilder sb = new StringBuilder(); 248 sb.append( "STATE=" ).append( state.toString() ); 249 sb.append( ", START=" ).append( HybsSystem.getDate( threadStart ) ); 250 sb.append( ", POOL=" ).append( queues.size() ); 251 sb.append( ", EXEC-START=" ).append( HybsSystem.getDate( execStart ) ); 252 sb.append( ", EXEC-END=" ).append( HybsSystem.getDate( execEnd ) ); 253 sb.append( ", DEBUG=" ).append( debug ); // 4.3.0.0 (2008/07/15) デバッグの追加 254 255 return sb.toString(); 256 } 257}