Is there a way to GZIP a CSV file while writing it? I didn’t understand how to combine the IO docs with the GZIP docs with the CSV docs.
require "csv"
require "gzip"
# crystal run --release convert.cr
input_file_name = "data_before.csv"
p "converting file #{input_file_name}"
counter = 0
output_file_name = "data_after.csv"
p ({:processing => {:output_file_name => output_file_name}})
File.open(input_file_name) do |infile|
reader = CSV.new(infile, header = true)
index_cust_id = reader.indices["cust_id"]
File.open(output_file_name, "w") do |outfile|
CSV.build(outfile) do |writer|
writer.row reader.headers
# pp reader.headers
# pp reader.indices
reader.each do |entry|
(1..loop_count).each do |loop_index|
row = entry.row.to_a
# pp row
row[index_cust_id] = "#{entry.row[index_cust_id]}#{loop_index}"
writer.row row
counter += 1
if counter % 2000000 == 0
p "[#{date}] lines: #{counter.humanize}"
# p `ps -p #{Process.pid} -o rss`
# p `ls -lh #{output_file_name}`
# p `ps -caxm -orss= | awk '{ sum += $1 } END { print "Resident Set Size: " sum/1024 " MiB" }'`
# p GC.stats
# GC.collect
outfile.flush
end
end
end
end
end
end
p `ls -lh *.csv`
p "finished!"
My key constraints are:
the input and output CSV files will both be larger than my RAM
the output file will come close to filling up my disk
AWS says that S3 will upload the file faster if it’s GZIP’d
AWS says that redshift will read the file faster if it’s GZIP’d in S3
There’s another overload of CSV.build
that takes an IO
as a first argument (this is in general true for every method in the standard library that build something into a string).
So, the way to do it would be to:
Open a file and get an IO to do that using File.open(..., "w")
Open a GZip::Writer and pass is the IO
from the previous point
Do CSV.build
and pass the IO
from the previous point
Right now I don’t have time to provide some code for this, but let us know if you get stuck, I can help later, or others will probably do it too.
this is what I’ve got (minus my business logic) If I don’t update, assume it worked in 0.35.1:
require "csv"
require "compress/gzip"
# time crystal run --release convert_gzip.cr
input_file_name = "data_before.csv"
p "converting file #{input_file_name}"
loop_count = 1
counter = 0
output_file_name = "data_after.csv.gz"
# File.open(output_file_name = "data_after.csv", "w") do |outfile|
Compress::Gzip::Writer.open(output_file_name) do |outfile|
File.open(input_file_name) do |infile|
reader = CSV.new(infile, header = true)
index_cust_id = reader.indices["cust_id"]
CSV.build(outfile) do |writer|
writer.row reader.headers
reader.each do |entry|
(1..loop_count).each do |loop_index|
row = entry.row.to_a
row[index_cust_id] = "#{entry.row[index_cust_id]}{loop_index}"
writer.row row
counter += 1
# pp ({:line => __LINE__})
if counter % 2000000 == 0
p "lines: #{counter.humanize}"
# p `ps -p #{Process.pid} -o rss`
p `ls -lh #{output_file_name}`
p `ps -caxm -orss= | awk '{ sum += $1 } END { print "Resident Set Size: " sum/1024 " MiB" }'`
# p GC.stats
# GC.collect
outfile.flush
end
end
end
end
end
outfile.flush
end
pp ({:line => __LINE__})
p ({:counter => counter.humanize})
p `ls -lh *.csv *.csv.gz`
p "finished!"
thanks!
1 Like
jzakiya
November 11, 2020, 10:42pm
4
FYI, for ultimate performance, and per discussion below,
It’s seems that some benchmark’s results are wrong. LLVM is very good at optimizing and some operations are simply not executed because the output result is not used anywhere in the program. To avoid this, simply assign the result to a variable and puts its value at the end of the program.
require "benchmark"
res_a = Bool
res_b = Bool
Benchmark.ips do |x|
n, r = 1213335u64, 5u64
puts "for u64 value"
# This operation will be skipped by LLVM
x.report(" n % r") { n % r == 0 }
#…
instead of
if counter % 2000000 == 0
do
if counter.remainder(2000000) == 0
or
if counter.divisible_by?(2000000)
4 Likes
I would like to skip the filesystem and upload the GZIP’d straight to S3, but I’m not sure how to add IO.new
and upload = uploader.upload(bucket: ENV["AWS_BUCKET"], object: uploaded_filename, io: infile)
in with the GZIP writer to get the desired effect.
Yeah, it’s really hard to write to an IO that will write to an HTTP request.
I think what you’ll need to do here is use a pipe:
reader, writer = IO.pipe
spawn do
# Here you write the gzipped csv file to the writer
end
# Here you pass the `reader` IO to AWS
uploader.upload(bucket: ENV["AWS_BUCKET"], object: uploaded_filename, io: reader)
Note: I didn’t try it and I’m not sure it will work.
I had to define them as io_reader, io_writer = IO.pipe
to not conflict with the CSV reader/writer.
In convert_gzip.cr:55:26
55 | Compress::Gzip::Writer.open(io_writer) do |outfile|
^---
Error: instantiating 'Compress::Gzip::Writer.class#open(IO::FileDescriptor)'
In convert_gzip.cr:55:26
55 | Compress::Gzip::Writer.open(io_writer) do |outfile|
^---
Error: instantiating 'Compress::Gzip::Writer.class#open(IO::FileDescriptor)'
In convert_gzip.cr:95:23
95 | upload = uploader.upload(bucket: ENV["AWS_BUCKET"], object: uploaded_filename, io: io_reader)
^-----
Error: instantiating 'Awscr::S3::FileUploader#upload()'
In lib/awscr-s3/src/awscr-s3/file_uploader.cr:32:13
32 | if io.size < UPLOAD_THRESHOLD
^---
Error: undefined method 'size' for IO::FileDescriptor
IO::FileDescriptor trace:
lib/awscr-s3/src/awscr-s3/file_uploader.cr:32
if io.size < UPLOAD_THRESHOLD
crystal run --release convert_gzip.cr --error-trace 0.71s user 0.27s system 95% cpu 1.017 total
I guess the S3 library anticipates that the size is known. I don’t know enough about IO to do something like chunk every 5MB to a “part” for S3 multipart upload.
I found something: https://stackoverflow.com/questions/8653146/can-i-stream-a-file-upload-to-s3-without-a-content-length-header
It might not be supported by the Crystal library you are doing so you might have to add that, or just write to disk and upload that later.
Here is my stream upload of a GZIP’d generated CSV:
require "csv"
require "compress/gzip"
require "awscr-s3"
# time crystal run --release app.cr
client = Awscr::S3::Client.new(
region: ENV["AWS_REGION"],
aws_access_key: ENV["AWS_KEY"],
aws_secret_key: ENV["AWS_SECRET"]
)
uploader = Awscr::S3::FileUploader.new(client)
input_file_name = "input.csv"
uploaded_filename = "output.csv.gz"
p "converting file #{input_file_name}"
status_interval = 2000000
size_check_interval = 10000
# S3 max file size: 1TB
# S3 max number of parts: 10,000
# part_size = 100000000 # 100MB for a 1TB ceiling
# part_size = 50000000 # 50MB for a 500GB ceiling
part_size = 25000000 # 25MB for a 250GB ceiling
counter = 0
multipart_upload = client.start_multipart_upload(
bucket: ENV["AWS_BUCKET"],
object: uploaded_filename)
part_number = 1
parts = Array(Awscr::S3::Response::UploadPartOutput).new
io = IO::Memory.new
Compress::Gzip::Writer.open(io) do |outfile|
File.open(input_file_name) do |infile|
csv_reader = CSV.new(infile, header = true)
index_cust_id = csv_reader.indices["cust_id"]
CSV.build(outfile) do |writer|
writer.row csv_reader.headers
csv_reader.each do |entry|
row = entry.row.to_a
# do business stuff
row[index_cust_id] = "#{entry.row[index_cust_id]}_b"
writer.row row
counter += 1
if counter.divisible_by?(size_check_interval)
if io.bytesize > part_size
pp ({:line => __LINE__, :"io.bytesize" => io.bytesize.humanize, :part_number => part_number})
resp = client.upload_part(
bucket: ENV["AWS_BUCKET"],
object: uploaded_filename,
part: io.to_slice,
part_number: part_number,
upload_id: multipart_upload.upload_id)
# pp resp
parts << resp
# empty the IO because the whole point
# of stream is to conserve RAM/disk
io.clear
part_number += 1
end
end
if counter.divisible_by?(status_interval)
p "[#{date}] lines: #{counter.humanize}"
# p `ps -p #{Process.pid} -o rss`
p `ls -lh #{output_file_name}`
p `ps -caxm -orss= | awk '{ sum += $1 } END { print "Resident Set Size: " sum/1024 " MiB" }'`
end
end
end
end
end
# if we have a final part, upload it
if io.bytesize > 0
pp ({:line => __LINE__, :"io.bytesize" => io.bytesize.humanize, :part_number => part_number, :upload_id => multipart_upload.upload_id})
resp = client.upload_part(
bucket: ENV["AWS_BUCKET"],
object: uploaded_filename,
upload_id: multipart_upload.upload_id,
part_number: part_number,
part: io.to_slice)
parts << resp
io.clear
end
# tell AWS to finallize the parts
client.complete_multipart_upload(
bucket: ENV["AWS_BUCKET"],
object: uploaded_filename,
upload_id: multipart_upload.upload_id,
parts: parts.sort_by(&.part_number)
)
1 Like
kees
November 18, 2020, 5:39pm
10
@carcinocron thank you for sharing thie example!