Class: AlWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/al_worker.rb

Overview

ワーカースーパークラス

Defined Under Namespace

Modules: Debug, IpcAction Classes: BroadcastMessage, Fd, Ipc, IpcClient, NumberedMessage, Program, Tcp, Timer

Constant Summary collapse

DEFAULT_WORKDIR =
"/tmp"
DEFAULT_NAME =
"al_worker"
LOG_SEVERITY =
{ :fatal=>Logger::FATAL, :error=>Logger::ERROR,
:warn=>Logger::WARN, :info=>Logger::INFO, :debug=>Logger::DEBUG }
@@log =

Returns ロガー.

Returns:

  • (Logger)

    ロガー

nil
@@mutex_sync =

Returns 同期実行用mutex.

Returns:

  • (Mutex)

    同期実行用mutex

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name = nil, workdir = nil) ⇒ AlWorker

constructor

Parameters:

  • name (String) (defaults to: nil)

    識別名



166
167
168
169
170
171
172
173
174
175
# File 'lib/al_worker.rb', line 166

def initialize( name = nil, workdir = nil )
  @values = {}
  @values_rwlock = Sync.new
  @name = name || DEFAULT_NAME
  @workdir = workdir || DEFAULT_WORKDIR
  @state = ""
  @pid_filename = File.join( @workdir, @name ) + ".pid"

  Signal::trap( :QUIT, proc{ signal_quit } )
end

Instance Attribute Details

#log_filenameString

Returns ログファイル名(フルパス).

Returns:

  • (String)

    ログファイル名(フルパス)



142
143
144
# File 'lib/al_worker.rb', line 142

def log_filename
  @log_filename
end

#nameString (readonly)

Returns ユニークネーム.

Returns:

  • (String)

    ユニークネーム



145
146
147
# File 'lib/al_worker.rb', line 145

def name
  @name
end

#pid_filenameString

Returns pidファイル名(フルパス).

Returns:

  • (String)

    pidファイル名(フルパス)



139
140
141
# File 'lib/al_worker.rb', line 139

def pid_filename
  @pid_filename
end

#privilegeString

Returns 実行権限ユーザ名.

Returns:

  • (String)

    実行権限ユーザ名



151
152
153
# File 'lib/al_worker.rb', line 151

def privilege
  @privilege
end

#program_nameString

Returns 現在実行中のRubyスクリプトの名前を表す文字列 $PROGRAM_NAME.

Returns:

  • (String)

    現在実行中のRubyスクリプトの名前を表す文字列 $PROGRAM_NAME



148
149
150
# File 'lib/al_worker.rb', line 148

def program_name
  @program_name
end

#software_watchdogBoolean

Returns ソフトウェアウォッチドッグ機能を使用.

Returns:

  • (Boolean)

    ソフトウェアウォッチドッグ機能を使用



157
158
159
# File 'lib/al_worker.rb', line 157

def software_watchdog
  @software_watchdog
end

#stateString (readonly)

Returns ステート(ステートマシン用).

Returns:

  • (String)

    ステート(ステートマシン用)



154
155
156
# File 'lib/al_worker.rb', line 154

def state
  @state
end

#valuesHash

Returns 外部提供を目的とする値のHash IPCの関係でキーは文字列のみとする。.

Returns:

  • (Hash)

    外部提供を目的とする値のHash IPCの関係でキーは文字列のみとする。



130
131
132
# File 'lib/al_worker.rb', line 130

def values
  @values
end

#values_rwlockSync (readonly)

Returns @values の reader writer lock.

Returns:

  • (Sync)

    @values の reader writer lock



133
134
135
# File 'lib/al_worker.rb', line 133

def values_rwlock
  @values_rwlock
end

#workdirString

Returns ワークファイルの作成場所.

Returns:

  • (String)

    ワークファイルの作成場所



136
137
138
# File 'lib/al_worker.rb', line 136

def workdir
  @workdir
end

Class Method Details

.log(*args) ⇒ Logger

ログ出力

Parameters:

  • msg (String, Object)

    エラーメッセージ

  • severity (Symbol)

    ログレベル :fatal, :error …

  • progname (String)

    プログラム名

Returns:

  • (Logger)

    Loggerオブジェクト



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/al_worker.rb', line 51

def self.log( *args )
  return nil  if ! @@log
  return @@log  if args.empty?

  msg,severity,progname = *args
  s = LOG_SEVERITY[ severity ]

  case msg
  when String
    @@log.add(s || Logger::INFO, msg, progname)

  when Exception
    @@log.add(s || Logger::ERROR, "#{msg.class} / #{msg.message}", progname)
    @@log.add(s || Logger::ERROR, "BACKTRACE: \n  " + msg.backtrace.join("\n  ") + "\n", progname)

  else
    @@log.add(s || Logger::INFO, msg.inspect, progname)
  end

  return @@log
end

.mutex_syncObject

同期実行用mutexのアクセッサ



37
38
39
# File 'lib/al_worker.rb', line 37

def self.mutex_sync()
  return @@mutex_sync
end

.na(method_name) ⇒ Object

Note:

ステートマシンで無視するイベントの記述

クラス定義中に、na :state_XXX_event_YYY の様に記述する。



123
124
125
# File 'lib/al_worker.rb', line 123

def self.na( method_name )
  define_method( method_name ) { |*args| }
end

.parse_request(req) ⇒ String, Hash

IPC定形リクエストからコマンドとパラメータを解析・取り出し

Parameters:

  • req (String)

    リクエスト

Returns:

  • (String)

    コマンド

  • (Hash)

    パラメータ



81
82
83
84
85
86
87
# File 'lib/al_worker.rb', line 81

def self.parse_request( req )
  (cmd,param) = req.split( " ", 2 )
  return cmd,{}  if param == nil
  param.strip!
  return cmd,{}  if param.empty?
  return cmd,( JSON.parse( param ) rescue { ""=>param } )
end

.reply(sock, st_code, st_msg, val = nil) ⇒ True

Note:

IPC定形リプライ

定形リプライフォーマット

 (ステータスコード) "200. Message"
 (JSONデータ)       { .... }
JSONデータは付与されない場合がある。
その判断は、ステータスコードの数字直後のピリオドの有無で行う。

Parameters:

  • sock (Socket)

    返信先ソケット

  • st_code (Integer)

    ステータスコード

  • st_msg (String)

    ステータスメッセージ

  • val (Hash) (defaults to: nil)

    リプライデータ

Returns:

  • (True)


105
106
107
108
109
110
111
112
113
114
# File 'lib/al_worker.rb', line 105

def self.reply( sock, st_code, st_msg, val = nil )
  sock.puts ("%03d" % st_code) + (val ? ". " : " ") + st_msg
  if val
    sock.puts val.to_json, ""
  end
  return true

rescue Errno::EPIPE
  Thread.exit
end

Instance Method Details

#daemonObject

デーモンになって実行



441
442
443
444
445
446
447
# File 'lib/al_worker.rb', line 441

def daemon()
  if @flag_debug
    run()
  else
    run( :daemon )
  end
end

#get_value(key) ⇒ Object

Note:

valueのゲッター タイムアウトなし(単一値)

値はdupして返す。

Parameters:

  • key (String)

    キー

Returns:

  • (Object)



250
251
252
253
254
# File 'lib/al_worker.rb', line 250

def get_value( key )
  @values_rwlock.synchronize( Sync::SH ) {
    return @values[ key.to_s ].dup rescue @values[ key.to_s ]
  }
end

#get_value_wt(key, timeout = 1) ⇒ Object, Boolean

Note:

valueのゲッター タイムアウト付き(単一値)

値はdupして返す。

Parameters:

  • key (String)

    キー

  • timeout (Numeric) (defaults to: 1)

    タイムアウト時間

Returns:

  • (Object)

  • (Boolean)

    ロック状態



287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/al_worker.rb', line 287

def get_value_wt( key, timeout = 1 )
  locked = false
  (timeout * 10).times {
    locked = @values_rwlock.try_lock( Sync::SH )
    break if locked
    sleep 0.1
  }

  return (@values[ key.to_s ].dup rescue @values[ key.to_s ]), locked

ensure
  @values_rwlock.unlock( Sync::SH ) if locked
end

#get_values(keys) ⇒ Hash

Note:

valueのゲッター タイムアウトなし(複数値)

値はdupするが、簡素化のためにディープコピーは行っていない。 文字列では問題ないが、配列などが格納されている場合は注意が必要。

Parameters:

  • keys (Array)

    キーの配列

Returns:

  • (Hash)



266
267
268
269
270
271
272
273
274
# File 'lib/al_worker.rb', line 266

def get_values( keys )
  ret = {}
  @values_rwlock.synchronize( Sync::SH ) {
    keys.each do |k|
      ret[ k.to_s ] = @values[ k.to_s ].dup rescue @values[ k.to_s ]
    end
  }
  return ret
end

#get_values_json(key = nil) ⇒ String

valueのゲッター JSON版 タイムアウトなし

Parameters:

  • key (String, Array) (defaults to: nil)

    取得する値のキー文字列

Returns:

  • (String)

    保存されている値のJSON文字列



338
339
340
341
342
343
344
345
346
347
# File 'lib/al_worker.rb', line 338

def get_values_json( key = nil )
  @values_rwlock.synchronize( Sync::SH ) {
    if key.class == Array
      ret = {}
      key.each { |k| ret[ k ] = @values[ k ] }
      return ret.to_json
    end
    return ( key ? { key => @values[key] } : @values ).to_json
  }
end

#get_values_json_wt(key = nil, timeout = nil) ⇒ String, Boolean

valuesのゲッター JSON版 タイムアウト付き

Parameters:

  • key (String, Array) (defaults to: nil)

    取得する値のキー文字列

  • timeout (Numeric) (defaults to: nil)

    タイムアウト時間

Returns:

  • (String)

    保存されている値のJSON文字列

  • (Boolean)

    ロック状態



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/al_worker.rb', line 358

def get_values_json_wt( key = nil, timeout = nil )
  locked = false
  timeout ||= 1       # can't change. see AlWorker::Ipc#ipc_a_get_values_wt()
  (timeout * 10).times {
    locked = @values_rwlock.try_lock( Sync::SH )
    break if locked
    sleep 0.1
  }
  if key.class == Array
    ret = {}
    key.each { |k| ret[ k ] = @values[ k ] }
    return ret.to_json, locked
  end
  return ( key ? { key => @values[key] } : @values ).to_json, locked

ensure
  @values_rwlock.unlock( Sync::SH ) if locked
end

#get_values_wt(keys, timeout = 1) ⇒ Object, Boolean

Note:

valueのゲッター タイムアウト付き(複数値)

値はdupするが、簡素化のためにディープコピーは行っていない。 文字列では問題ないが、配列などが格納されている場合は注意が必要。

Parameters:

  • keys (Array)

    キーの配列

  • timeout (Numeric) (defaults to: 1)

    タイムアウト時間

Returns:

  • (Object)

  • (Boolean)

    ロック状態



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/al_worker.rb', line 313

def get_values_wt( keys, timeout = 1 )
  locked = false
  (timeout * 10).times {
    locked = @values_rwlock.try_lock( Sync::SH )
    break if locked
    sleep 0.1
  }

  ret = {}
  keys.each do |k|
    ret[ k.to_s ] = @values[ k.to_s ].dup rescue @values[ k.to_s ]
  end
  return ret, locked

ensure
  @values_rwlock.unlock( Sync::SH ) if locked
end

#initialize2Object

Note:

初期化2

常駐後に処理をさせるには、これをオーバライドする。



572
573
# File 'lib/al_worker.rb', line 572

def initialize2()
end

#load_values(filename = nil) ⇒ Object

値(@values)読み込み



406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# File 'lib/al_worker.rb', line 406

def load_values( filename = nil )
  filename ||= File.join( @workdir, @name ) + ".values"
  digest = Digest::SHA1.file( filename ) rescue nil
  return nil if ! digest      # same as file not found.

  digestfile = File.join( File.dirname(filename), File.basename(filename,".*") ) + ".sha1"
  digestfile_value = File.read( digestfile ) rescue nil
  if digestfile_value
    return nil  if digest != digestfile_value
  end

  json = ""
  File.open( filename, "r" ) { |f|
    while txt = f.gets
      break if txt == "VALUES: \n"
    end
    if txt == "VALUES: \n"
      while txt = f.gets
        json << txt
      end
    end
  }
  return nil  if json == ""
  begin
    @values = JSON.parse( json )
    return true
  rescue
    return false
  end
end

#log(*args) ⇒ Object

ログ出力

See Also:

  • log()


581
582
583
# File 'lib/al_worker.rb', line 581

def log( *args )
  AlWorker.log( *args )
end

#no_method_error(event) ⇒ Object

メソッドエラーの場合のエラーハンドラ



636
637
638
# File 'lib/al_worker.rb', line 636

def no_method_error( event )
  raise "No action defined. state: #{@state}, event: #{event}"
end

#parse_option(argv = ARGV) ⇒ Object

基本的なオプションの解析



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/al_worker.rb', line 181

def parse_option( argv = ARGV )
  i = 0
  while i < argv.size
    case argv[i]
    when "-d"                 # debug mode
      @flag_debug = true
    when "-k"                 # kill stay process.
      @flag_kill = true
    when "-r"                 # restart process.
      @flag_restart = true
    when "-p"                 # specify pid filename
      @pid_filename = argv[i += 1]
    when "-l"                 # specify log filename
      @log_filename = argv[i += 1]
    end
    i += 1
  end
end

#reply(sock, st_code, st_msg, val = nil) ⇒ Object

IPC定形リプライ

See Also:

  • reply()


591
592
593
# File 'lib/al_worker.rb', line 591

def reply( sock, st_code, st_msg, val = nil )
  AlWorker.reply( sock, st_code, st_msg, val )
end

#run(*modes) ⇒ Object

実行開始

Parameters:

  • modes (Symbol)

    動作モード nul デーモンにならずに実行 :daemon デーモンで実行 :nostop デーモンにならずスリープもしない :nopid プロセスIDファイルを作らない :nolog ログファイルを作らない :exit_idle_task アイドルタスクが終了したら

    プロセスも終了する
    


461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
# File 'lib/al_worker.rb', line 461

def run( *modes )
  # 実効権限変更(放棄)
  if @privilege
    uid = Etc.getpwnam( @privilege ).uid
    Process.uid = uid
    Process.euid = uid
  end

  # 停止 or 再実行?
  if @flag_kill || @flag_restart
    begin
      pid = File.read( @pid_filename ).to_i
      Process.kill( "TERM", pid )
    rescue Errno::ENOENT
      puts "Error: No pid file. '#{@pid_filename}'"
    rescue Errno::ESRCH
      puts "Error: No such pid=#{pid} process."
    rescue Errno::EPERM
      puts "Error: Operation not permitted for pid=#{pid} process."
    end

    exit(0)  if @flag_kill
    sleep 1
  end

  # ログ準備
  if modes.include?( :nolog )
    @@log == nil
  elsif @@log == nil
    @log_filename ||= File.join( @workdir, @name ) + ".log"
    @@log = Logger.new( @log_filename, 3 )
    @@log.level = @flag_debug ? Logger::DEBUG : Logger::INFO
  end

  if ! modes.include?( :nopid )
    # 実行可/不可確認
    if File.directory?( @pid_filename )
      puts "ERROR: @pid_filename is directory."
      exit( 64 )
    end
    if File.exist?( @pid_filename )
      puts "ERROR: Still work."
      exit( 64 )
    end

    # プロセスIDファイル作成
    # (note) pid作成エラーの場合は、daemonになる前にここで検出される。
    File.open( @pid_filename, "w" ) { |file| file.write( Process.pid ) }
  end

  # 常駐処理
  if modes.include?( :daemon )
    Process.daemon()
    # プロセスIDファイル再作成
    if ! modes.include?( :nopid )
      File.open( @pid_filename, "w" ) { |file| file.write( Process.pid ) }
    end
  end
  if @program_name
    $PROGRAM_NAME = @program_name
  end

  # 終了時処理
  at_exit {
    if ! modes.include?( :nopid )
      File.unlink( @pid_filename ) rescue 0
    end
    AlWorker.log( "finish", :info, @name )
  }

  # 初期化2
  AlWorker.log( "start", :info, @name )
  begin
    initialize2()
  rescue Exception => ex
    raise ex  if ex.class == SystemExit
    AlWorker.log( ex )
    raise ex  if STDERR.isatty
    exit( 64 )
  end

  # アイドルタスク
  if respond_to?( :idle_task, true )
    Thread.start {
      Thread.current.priority -= 1
      begin
        idle_task()
      rescue Exception => ex
        raise ex  if ex.class == SystemExit
        AlWorker.log( ex )
        if STDERR.isatty
          STDERR.puts ex.to_s
          STDERR.puts ex.backtrace.join("\n") + "\n"
        end
      end
      exit  if modes.include?( :exit_idle_task )
    }
  end

  # メインスレッド停止
  return  if modes.include?( :nostop )
  sleep
end

#save_valuesObject

Note:

値(@values)保存

排他処理なし。 バックアップファイルを3つまで作成する。



385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/al_worker.rb', line 385

def save_values()
  filename = File.join( @workdir, @name ) + ".values"
  File.rename( filename + ".bak2", filename + ".bak3" ) rescue 0
  File.rename( filename + ".bak1", filename + ".bak2" ) rescue 0
  File.rename( filename,           filename + ".bak1" ) rescue 0

  File.open( filename, "w" ) { |f|
    f.puts "DATE: #{Time.now}"
    f.puts "NAME: #{@name}"
    f.puts "SELF: #{self.inspect}"
    f.puts "VALUES: \n#{@values.to_json}"
  }
  File.open( File.join( @workdir, @name ) + ".sha1", "w" ) { |file|
    file.write( Digest::SHA1.file( filename ) )
  }
end

#set_state(state) ⇒ Object Also known as: state=, next_state

現在のステートを宣言する

Parameters:

  • state (String)

    ステート文字列



646
647
648
649
# File 'lib/al_worker.rb', line 646

def set_state( state )
  @state = state.to_s
  AlWorker.log( "change state to #{@state}", :debug, @name )
end

#set_value(key, val) ⇒ Object

valueのセッター(単一値)

Parameters:

  • key (String)

    キー

  • val (Object)



227
228
229
# File 'lib/al_worker.rb', line 227

def set_value( key, val )
  @values_rwlock.synchronize( Sync::EX ) { @values[ key.to_s ] = val }
end

#set_values(values) ⇒ Object

valueのセッター(複数値)

Parameters:

  • values (Hash)

    セットする値



237
238
239
# File 'lib/al_worker.rb', line 237

def set_values( values )
  @values_rwlock.synchronize( Sync::EX ) { @values.merge!( values ) }
end

#signal_quitObject

Note:

シグナルハンドラ SIGQUIT

デバグ用

状態をファイルに書き出す。
画面があれば、表示する。


209
210
211
212
213
214
215
216
217
218
# File 'lib/al_worker.rb', line 209

def signal_quit()
  save_values()

  if STDOUT.isatty
    puts "\n===== @values ====="
    @values.keys.sort.each do |k|
      puts "#{k}=> #{@values[k]}"
    end
  end
end

#trigger_event(event, *args) ⇒ Object

ステートマシン 実行メソッド割り当て

Parameters:

  • event (String)

    イベント名

  • args (Array)

    引数



602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
# File 'lib/al_worker.rb', line 602

def trigger_event( event, *args )
  @respond_to = "from_#{@state}_event_#{event}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  @respond_to = "state_#{@state}_event_#{event}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  @respond_to = "event_#{event}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  @respond_to = "state_#{@state}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  # 実行すべきメソッドが見つからない場合
  @respond_to = ""
  no_method_error( event )
end