New Crystalliser, reasonably polyglot. I have some very large log files that I want to process line by line.
Some of them are straightforward: io = File.open, line = io.read_line
.
Some are compressed with e.g. xz. In Perl I’d do this by spawning an xzcat process and reading its stdout.
The best I’ve managed so far is to use an IO::Memory
, but I don’t want to store the whole output of the xzcat in memory, just one line at a time. (Or whatever back-end buffer is needed based on read size, fair enough.)
Obviously I want the standard file reader and the compressed file reader to produce identical streams that can be fed through the same processing loop.
Any suggestions?
I don’t think xz
is supported via the stdlib’s Compress
namespace, but there does appear to be a lib for it: GitHub - naqvis/xz.cr: Crystal bindings to the xz (lzma) compression library. The Compress
types, including the one from the shard, yield an IO
of the decompressed data. So you could do something like this:
File.open("file.xz") do |file|
Compress::XZ::Reader.open(file) do |xz|
xz.each_line do |line|
pp line
end
end
end
And this should print each line of the decompressed data without having it all in memory.
1 Like
Alternatively to the xz
shard you can also run xzcat
and read its output.
Process.run("xzcat", ["file.xz"], output: :pipe) do |process|
process.output.each_line do |line|
pp line
end
end
1 Like
Thanks both; since I don’t need this to run outside Linux my assumption was the subprocess but I hadn’t found the details of how to make that work.
To give a bit of context, what I’m actually trying to do is take a bunch of Apache logfiles (from separate servers, all with accurate timestamps) and put them into a single file in chronological order. My approach to this is basically “open every file as a stream, parse the first line of each, output the line with earliest timestamp and read a new line from that file, continue, closing each stream when I run out of lines”.
This doesn’t seem to be compatible with the Compress::XZ::Reader
code above, because I need to stow away that IO in a list, not process it at once., so the open-do
isn’t the pattern I need.
The crude code I’ve come up with for this is here. Probably I should move getnextline to be an instance method. But it seems to be working.
module Logmerge
VERSION = "0.1.0"
class Logfile
getter io : IO
getter done : Bool
getter timestamp : Time
getter line : String
def initialize(@io)
@done, @timestamp, @line = getnextline(io)
end
def cont()
@done, @timestamp, @line = getnextline(@io)
end
end
logs = Array(Logfile).new
ARGV.each do |filename|
io = IO::Memory.new
if filename =~ /.xz$/
io = Process.new("xzcat", [filename], output: :pipe). output
elsif filename =~ /.gz$/
io = Process.new("zcat", [filename], output: :pipe).output
else
io = File.open(filename)
end
l = Logfile.new(io)
logs.push(l)
end
while logs.size > 0
mints = Time.utc
active = -1
logs.each_with_index do |log, n|
if log.timestamp && log.timestamp < mints
mints = log.timestamp
active = n
end
end
if active >= 0
print "#{logs[active].line}\n"
logs[active].cont()
if logs[active].done
logs.delete_at(active)
end
end
end
end
def getnextline(io : IO)
line = ""
begin
line = io.read_line
rescue
io.close
return true, Time.utc, ""
end
line.match(/\[(\d+\/(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\/\d{4}:\d{2}:\d{2}:\d{2} [-+]\d{4})\]/)
ts = Time.parse!($1, "%d/%b/%Y:%H:%M:%S %z")
return false, ts, line
end
1 Like