The Crystal Programming Language Forum

Write CSV straight to disk as GZiped fiile?

Is there a way to GZIP a CSV file while writing it? I didn’t understand how to combine the IO docs with the GZIP docs with the CSV docs.

require "csv"
require "gzip"
# crystal run --release convert.cr

input_file_name = "data_before.csv"

p "converting file #{input_file_name}"

counter = 0

output_file_name = "data_after.csv"
p ({:processing => {:output_file_name => output_file_name}})
File.open(input_file_name) do |infile|
  reader = CSV.new(infile, header = true)

  index_cust_id = reader.indices["cust_id"]

  File.open(output_file_name, "w") do |outfile|
    CSV.build(outfile) do |writer|
      writer.row reader.headers

      # pp reader.headers
      # pp reader.indices
      reader.each do |entry|
        (1..loop_count).each do |loop_index|
          row = entry.row.to_a
          # pp row
          row[index_cust_id] = "#{entry.row[index_cust_id]}#{loop_index}"
          writer.row row

          counter += 1

          if counter % 2000000 == 0
            p "[#{date}] lines: #{counter.humanize}"
            # p `ps -p #{Process.pid} -o rss`
            # p `ls -lh #{output_file_name}`
            # p `ps -caxm -orss= | awk '{ sum += $1 } END { print "Resident Set Size: " sum/1024 " MiB" }'`
            # p GC.stats
            # GC.collect
            outfile.flush
          end
        end
      end
    end
  end
end

p `ls -lh *.csv`

p "finished!"

My key constraints are:

  • the input and output CSV files will both be larger than my RAM
  • the output file will come close to filling up my disk
  • AWS says that S3 will upload the file faster if it’s GZIP’d
  • AWS says that redshift will read the file faster if it’s GZIP’d in S3

There’s another overload of CSV.build that takes an IO as a first argument (this is in general true for every method in the standard library that build something into a string).

So, the way to do it would be to:

  1. Open a file and get an IO to do that using File.open(..., "w")
  2. Open a GZip::Writer and pass is the IO from the previous point
  3. Do CSV.build and pass the IO from the previous point

Right now I don’t have time to provide some code for this, but let us know if you get stuck, I can help later, or others will probably do it too.

this is what I’ve got (minus my business logic) If I don’t update, assume it worked in 0.35.1:

require "csv"
require "compress/gzip"
# time crystal run --release convert_gzip.cr

input_file_name = "data_before.csv"

p "converting file #{input_file_name}"

loop_count = 1
counter = 0

output_file_name = "data_after.csv.gz"

# File.open(output_file_name = "data_after.csv", "w") do |outfile|
Compress::Gzip::Writer.open(output_file_name) do |outfile|
  File.open(input_file_name) do |infile|
    reader = CSV.new(infile, header = true)

    index_cust_id = reader.indices["cust_id"]

    CSV.build(outfile) do |writer|
      writer.row reader.headers
      reader.each do |entry|
        (1..loop_count).each do |loop_index|
          row = entry.row.to_a

          row[index_cust_id] = "#{entry.row[index_cust_id]}{loop_index}"

          writer.row row
          counter += 1

          # pp ({:line => __LINE__})
          if counter % 2000000 == 0
            p "lines: #{counter.humanize}"
            # p `ps -p #{Process.pid} -o rss`
            p `ls -lh #{output_file_name}`
            p `ps -caxm -orss= | awk '{ sum += $1 } END { print "Resident Set Size: " sum/1024 " MiB" }'`
            # p GC.stats
            # GC.collect
            outfile.flush
          end
        end
      end
    end
  end

  outfile.flush
end

pp ({:line => __LINE__})
p ({:counter => counter.humanize})

p `ls -lh *.csv *.csv.gz`

p "finished!"

thanks!

1 Like

FYI, for ultimate performance, and per discussion below,

instead of

if counter % 2000000 == 0

do

if counter.remainder(2000000) == 0

or

if counter.divisible_by?(2000000)

4 Likes

I would like to skip the filesystem and upload the GZIP’d straight to S3, but I’m not sure how to add IO.new and upload = uploader.upload(bucket: ENV["AWS_BUCKET"], object: uploaded_filename, io: infile) in with the GZIP writer to get the desired effect.

Yeah, it’s really hard to write to an IO that will write to an HTTP request.

I think what you’ll need to do here is use a pipe:

reader, writer = IO.pipe
spawn do
  # Here you write the gzipped csv file to the writer
end

# Here you pass the `reader` IO to AWS
uploader.upload(bucket: ENV["AWS_BUCKET"], object: uploaded_filename, io: reader)

Note: I didn’t try it and I’m not sure it will work.

I had to define them as io_reader, io_writer = IO.pipe to not conflict with the CSV reader/writer.

In convert_gzip.cr:55:26

 55 | Compress::Gzip::Writer.open(io_writer) do |outfile|
                             ^---
Error: instantiating 'Compress::Gzip::Writer.class#open(IO::FileDescriptor)'


In convert_gzip.cr:55:26

 55 | Compress::Gzip::Writer.open(io_writer) do |outfile|
                             ^---
Error: instantiating 'Compress::Gzip::Writer.class#open(IO::FileDescriptor)'


In convert_gzip.cr:95:23

 95 | upload = uploader.upload(bucket: ENV["AWS_BUCKET"], object: uploaded_filename, io: io_reader)
                        ^-----
Error: instantiating 'Awscr::S3::FileUploader#upload()'


In lib/awscr-s3/src/awscr-s3/file_uploader.cr:32:13

 32 | if io.size < UPLOAD_THRESHOLD
            ^---
Error: undefined method 'size' for IO::FileDescriptor

IO::FileDescriptor trace:

  lib/awscr-s3/src/awscr-s3/file_uploader.cr:32

          if io.size < UPLOAD_THRESHOLD

crystal run --release convert_gzip.cr --error-trace  0.71s user 0.27s system 95% cpu 1.017 total

I guess the S3 library anticipates that the size is known. I don’t know enough about IO to do something like chunk every 5MB to a “part” for S3 multipart upload.

I found something: https://stackoverflow.com/questions/8653146/can-i-stream-a-file-upload-to-s3-without-a-content-length-header

It might not be supported by the Crystal library you are doing so you might have to add that, or just write to disk and upload that later.

Here is my stream upload of a GZIP’d generated CSV:

require "csv"
require "compress/gzip"
require "awscr-s3"
# time crystal run --release app.cr

client = Awscr::S3::Client.new(
  region: ENV["AWS_REGION"],
  aws_access_key: ENV["AWS_KEY"],
  aws_secret_key: ENV["AWS_SECRET"]
)
uploader = Awscr::S3::FileUploader.new(client)

input_file_name = "input.csv"
uploaded_filename = "output.csv.gz"

p "converting file #{input_file_name}"

status_interval = 2000000
size_check_interval = 10000

# S3 max file size: 1TB
# S3 max number of parts: 10,000
# part_size = 100000000 # 100MB for a 1TB ceiling
# part_size = 50000000 # 50MB for a 500GB ceiling
part_size = 25000000 # 25MB for a 250GB ceiling

counter = 0

multipart_upload = client.start_multipart_upload(
  bucket: ENV["AWS_BUCKET"],
  object: uploaded_filename)
part_number = 1
parts = Array(Awscr::S3::Response::UploadPartOutput).new

io = IO::Memory.new

Compress::Gzip::Writer.open(io) do |outfile|
  File.open(input_file_name) do |infile|
    csv_reader = CSV.new(infile, header = true)

    index_cust_id = csv_reader.indices["cust_id"]

    CSV.build(outfile) do |writer|
      writer.row csv_reader.headers
      csv_reader.each do |entry|
        row = entry.row.to_a

        # do business stuff
        row[index_cust_id] = "#{entry.row[index_cust_id]}_b"

        writer.row row
        counter += 1

        if counter.divisible_by?(size_check_interval)
          if io.bytesize > part_size
            pp ({:line => __LINE__, :"io.bytesize" => io.bytesize.humanize, :part_number => part_number})
            resp = client.upload_part(
              bucket: ENV["AWS_BUCKET"],
              object: uploaded_filename,
              part: io.to_slice,
              part_number: part_number,
              upload_id: multipart_upload.upload_id)
            # pp resp
            parts << resp
            # empty the IO because the whole point
            # of stream is to conserve RAM/disk
            io.clear
            part_number += 1
          end
        end
        if counter.divisible_by?(status_interval)
          p "[#{date}] lines: #{counter.humanize}"
          # p `ps -p #{Process.pid} -o rss`
          p `ls -lh #{output_file_name}`
          p `ps -caxm -orss= | awk '{ sum += $1 } END { print "Resident Set Size: " sum/1024 " MiB" }'`
        end
      end
    end
  end
end

# if we have a final part, upload it
if io.bytesize > 0
  pp ({:line => __LINE__, :"io.bytesize" => io.bytesize.humanize, :part_number => part_number, :upload_id => multipart_upload.upload_id})

  resp = client.upload_part(
    bucket: ENV["AWS_BUCKET"],
    object: uploaded_filename,
    upload_id: multipart_upload.upload_id,
    part_number: part_number,
    part: io.to_slice)
  parts << resp
  io.clear
end

# tell AWS to finallize the parts
client.complete_multipart_upload(
  bucket: ENV["AWS_BUCKET"],
  object: uploaded_filename,
  upload_id: multipart_upload.upload_id,
  parts: parts.sort_by(&.part_number)
)
1 Like

@InstanceOfMichael thank you for sharing thie example!