A logfile parser that parses log entries from multiple logfile streams, sorts by timestamp, and sends them to a processor.
Create a new parser that reads from the given streams.
You should provide a processor to receive the data from the logfile. The processor may be either an entry processor or a record processor. You may also pass nil for the processor. In this case, the generated log entries will not be sent to a processor but will still be returned by the #parse_one_entry method.
Recognized options include:
:levels
Sawmill::LevelGroup to use to parse log levels. If not specified, Sawmill::STANDARD_LEVELS is used by default.
:emit_incomplete_records_at_eof
If set to true, causes any incomplete log records to be emitted in their incomplete state when EOF is reached on all streams.
# File lib/sawmill/multi_parser.rb, line 63 def initialize(io_array_, processor_, opts_={}) @emit_incomplete_records_at_eof = opts_.delete(:emit_incomplete_records_at_eof) @heap = Util::Heap.new{ |a_, b_| a_[1].timestamp <=> b_[1].timestamp } @queue = Util::Queue.new encoding_array_ = opts_.delete(:encoding_array) internal_encoding_array_ = opts_.delete(:internal_encoding_array) io_array_.each_with_index do |io_, index_| opts2_ = opts_.dup opts2_[:encoding] = encoding_array_[index_] if encoding_array_ opts2_[:internal_encoding] = internal_encoding_array_[index_] if internal_encoding_array_ _enqueue(Parser.new(io_, nil, opts2_)) end @processor = nil if processor_.respond_to?(:record) && processor_.respond_to?(:extra_entry) @processor = RecordBuilder.new(processor_) elsif processor_.respond_to?(:begin_record) && processor_.respond_to?(:end_record) @processor = processor_ end @classifier = @processor ? EntryClassifier.new(@processor) : nil end
Parse until EOF is reached on all streams, and emit the log entries to the processor.
# File lib/sawmill/multi_parser.rb, line 110 def parse_all while parse_one_entry; end end
Parse one log entry from the streams and emit it to the processor. Also returns the log entry. Returns nil if EOF has been reached on all streams.
# File lib/sawmill/multi_parser.rb, line 89 def parse_one_entry entry_ = @queue.dequeue unless entry_ data_ = @heap.remove if data_ _enqueue(data_[0]) entry_ = data_[1] else if @emit_incomplete_records_at_eof && @processor.respond_to?(:emit_incomplete_records) @processor.emit_incomplete_records end end end @classifier.entry(entry_) if entry_ entry_ end