Thursday, June 21, 2012

Parallel Processing of File Data, Iterator groups and Sequences FTW!

I have occasion to need to process very large files here and there.  It seems that Scala is very good at this in general.  There is a nice feature in the BufferedSource class that allows you to break up file parsing or processing into chunks so that parallelization can be achieved.

If you've tried the obvious solution, simply adding .par, the method isn't present.  So, you might convert to a List with toList.  When you convert like this, Scala will then compile all the lines into a List in memory before passing it on.  If you have a large file, you'll quickly run out of memory and your process will crash with an OutOfMemoryException.

BufferedSource offers us another way to do this with the grouped() method call.  You can pass a group size into the method call to break your stream into a sequence of lists.  So, instead of just a String sequence made up of millions of entries, one for each line, you get an set of Iterators made up of Sequences with 10,000 lines in each.  A BufferedSource is a kind of Iterator, and any kind of Iterator can be grouped in this way, Sequences or Lists included.  Now you have a Sequence type with a finite element count which you can parallelize the processing on and increase throughput, and flatMap the results back together at the end.

The code looks something like this:

io.Source.stdin.getLines().grouped(10000).flatMap { y=>
      y.par.map({x: String =>
        LogParser.parseItem(x)
      })}.flatMap(x=>x).foreach({ x: LogRecord =>
         println(x.toString)
      })

So with this, we can read lines from stdin as a buffered source, and also parallelize without the need to hold the entire dataset in memory!

At the moment, there is no easy way to force Scala to increase the parallelization level beyond your CPU core count that I could get to work.  This kind of I/O splitting wasn't what the parallelization operations had in mind as far as I know, it's more a job for Akka or similar.  Fortunately, in Scala 2.10, we'll get Promises and Futures which will make this kind of thing much more powerful and give us more easy knobs and dials to turn on the concurrency configuration.  Hopefully I'll post on that when it happens!

Tuesday, June 12, 2012

Parsing CSVs in Scala

I did a quick google on parsing CSVs in Scala, and one of the top hits was a stack overflow question where the answer was wrong.  Very wrong.  So, I threw together a quick parser in Scala to get the job done.  I'm not saying it's good, but it passes the spec tests I have included quotes and quoted commas both with single and double quotes.  I hope this is useful, and perhaps somebody can improve upon it.

object CSVParser extends RegexParsers {
  def apply(f: java.io.File): Iterator[List[String]] = io.Source.fromFile(f).getLines().map(apply(_))
  def apply(s: String): List[String] = parseAll(fromCsv, s) match {
    case Success(result, _) => result
    case failure: NoSuccess => {throw new Exception("Parse Failed")}
  }

  def fromCsv:Parser[List[String]] = rep1(mainToken) ^^ {case x => x}
  def mainToken = (doubleQuotedTerm | singleQuotedTerm | unquotedTerm) <~ ",?".r ^^ {case a => a}
  def doubleQuotedTerm: Parser[String] = "\"" ~> "[^\"]+".r <~ "\"" ^^ {case a => (""/:a)(_+_)}
  def singleQuotedTerm = "'" ~> "[^']+".r <~ "'" ^^ {case a => (""/:a)(_+_)}
  def unquotedTerm = "[^,]+".r ^^ {case a => (""/:a)(_+_)}

  override def skipWhitespace = false
}

Wednesday, June 6, 2012

Data Migration - Scala and Play along the Way

I've been nibbling at data migration system for many years.  It's gone through various transformations, and it's latest addition is mostly working.  The original purpose of the program I forget, but it's main use for awhile has been to extract the EVE Online database data from the SQL Server database dump that CCP kindly provides.  Each EVE revision, I take the CCP dump, spin up a Windows server in the cloud, import the database and extract what I need to port it into PostgreSQL, which is my system of choice.


Over the years, JDBC has improved, and technologies have moved along.  In the beginning I wrote Hermes-DB, a simple ORM that was very much not type safe, but coped with many of the auto-probing of table information that comes along with a more dynamic style ORM.  One can argue that this isn't really ORM at all, and at this point, I'm inclined to agree.


Having said that, the auto-probing capabilities turned out to be very very useful in extracting data.  Because the system was predicated on the idea that learning about the database should be the job of the framework, not the developer, it had a reasonably well formed concept of representing tables and columns as objects.  With a bit of tweaking, adding a new metadata class along the way, the package can represent a table definition fairly well now.


What this allows me to do today, is create both a solid database dump, and the DDL to build the table structure.  Theoretically this system could be modified to pull from any datastore and generate for any other datastore.  The system was built in a way that was hopefully designed to facilitate that.


2012 rolls around, and things have changed.  The landscape for web development has been shifting over the last decade as people struggle to find way to get the tools out of the developer's way, and enable them to do their job more and fight with code less.  The most recent evolution in that sequence that I've been working with, is Scala and Play.  As I work with two tools, I'm increasingly finding it easier to build systems that are stable, and take much less code to write.


Hermes-DB was originally designed just to output DDL, but when I started working with JPA, a system that requires a whole lot of scaffolding, it made sense to have one of the output "DDLs" be Java classes with JPA annotations.  Over the last few days, I've been making a new variety of output, Scala case classes designed to work with Play and therefore Anorm.  Anorm is very powerful, and gives you tools that "get out of your way", but doesn't have a lot when it comes to scaffolding.  I've poked around a bit, and it seems there was a scaffolding plugin for Play 1, but none exists for Play 2.  This little utility, is helping fill that gap for me.  It outputs Scala class and companion object definitions based on the database schema.


The EVE Online database comes out of the box with about 75 tables.  75 tables that I'd rather not have to manually create mappings for for model classes.  This little utility made my life much easier.  A bit cheer for code generation tools!


It is open source of course, and can be found on gitorious with the git URL: git@gitorious.org:export4pg/export4pg.git


Please note that some of this code is very very old, and it's worked for probably close to a decade so some of it is a bit ancient in both understanding and coding style.  It is however, very useful, and possibly one of the pieces of code I've written that's still in usage and not broken from constant tinkering!