Command output capture line by line?

New Crystalliser, reasonably polyglot. I have some very large log files that I want to process line by line.

Some of them are straightforward: io = File.open, line = io.read_line.

Some are compressed with e.g. xz. In Perl I’d do this by spawning an xzcat process and reading its stdout.

The best I’ve managed so far is to use an IO::Memory, but I don’t want to store the whole output of the xzcat in memory, just one line at a time. (Or whatever back-end buffer is needed based on read size, fair enough.)

Obviously I want the standard file reader and the compressed file reader to produce identical streams that can be fed through the same processing loop.

Any suggestions?

I don’t think xz is supported via the stdlib’s Compress namespace, but there does appear to be a lib for it: GitHub - naqvis/xz.cr: Crystal bindings to the xz (lzma) compression library. The Compress types, including the one from the shard, yield an IO of the decompressed data. So you could do something like this:

File.open("file.xz") do |file|
  Compress::XZ::Reader.open(file) do |xz|
    xz.each_line do |line|
      pp line
    end
  end
end

And this should print each line of the decompressed data without having it all in memory.

1 Like

Alternatively to the xz shard you can also run xzcat and read its output.

Process.run("xzcat", ["file.xz"], output: :pipe) do |process|
  process.output.each_line do |line|
    pp line
  end
end
1 Like

Thanks both; since I don’t need this to run outside Linux my assumption was the subprocess but I hadn’t found the details of how to make that work.

To give a bit of context, what I’m actually trying to do is take a bunch of Apache logfiles (from separate servers, all with accurate timestamps) and put them into a single file in chronological order. My approach to this is basically “open every file as a stream, parse the first line of each, output the line with earliest timestamp and read a new line from that file, continue, closing each stream when I run out of lines”.

This doesn’t seem to be compatible with the Compress::XZ::Reader code above, because I need to stow away that IO in a list, not process it at once., so the open-do isn’t the pattern I need.

The crude code I’ve come up with for this is here. Probably I should move getnextline to be an instance method. But it seems to be working.

module Logmerge
  VERSION = "0.1.0"

  class Logfile
    getter io : IO
    getter done : Bool
    getter timestamp : Time
    getter line : String

    def initialize(@io)
      @done, @timestamp, @line = getnextline(io)
    end
    def cont()
      @done, @timestamp, @line = getnextline(@io)
    end
  end

  logs = Array(Logfile).new
  ARGV.each do |filename|
    io = IO::Memory.new
    if filename =~ /.xz$/
      io = Process.new("xzcat", [filename], output: :pipe). output
    elsif filename =~ /.gz$/
      io = Process.new("zcat", [filename], output: :pipe).output
    else
      io = File.open(filename)
    end
    l = Logfile.new(io)
    logs.push(l)
  end

  while logs.size > 0
    mints = Time.utc
    active = -1
    logs.each_with_index do |log, n|
      if log.timestamp && log.timestamp < mints
        mints = log.timestamp
        active = n
      end
    end
    if active >= 0
      print "#{logs[active].line}\n"
      logs[active].cont()
      if logs[active].done
        logs.delete_at(active)
      end
    end
  end
end

def getnextline(io : IO)
  line = ""
  begin
    line = io.read_line
  rescue
    io.close
    return true, Time.utc, ""
  end
  line.match(/\[(\d+\/(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\/\d{4}:\d{2}:\d{2}:\d{2} [-+]\d{4})\]/)
  ts = Time.parse!($1, "%d/%b/%Y:%H:%M:%S %z")
  return false, ts, line
end
1 Like