Thanks to my mentor, I have been following Big Data book closely in MEAP. On Chapter 6, he talks about the User Identifier Normalization problem, which is an iterative graph algorithm. He provides a reference implementation using Cascalog.
Since I have been using Scalding close to a year now. I re-wrote the same in scalding. This is my first attempt in writing an iterative algorithm using scalding.
Any kind of feedback is highly appreciated.
Since I have been using Scalding close to a year now. I re-wrote the same in scalding. This is my first attempt in writing an iterative algorithm using scalding.
Any kind of feedback is highly appreciated.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.scalding.{Tsv, Job, Args} | |
import scala.collection.immutable.TreeSet | |
/* | |
Lets assume we are reading a file of format (a,b) where a,b denote that node a and node b are connected in a graph. | |
For simplicity we will assume that a and b are ints. We want to find the mapping of all the nodes on a fixed point. | |
*/ | |
class FixedPointJob(args: Args) extends Job(args) { | |
val input = args("input") | |
val outputBaseDir = args("output-base-dir") | |
val progressSink = args("progress") | |
val iteration = args.getOrElse("iteration", "1").toInt | |
val sink = outputBaseDir + "/run-" + iteration | |
def bidirectionalEdge(tuple: (Int, Int)) = { | |
val (node1, node2) = tuple | |
Iterable((node1, node2), (node2, node1)) | |
} | |
def iterateEdges(edges: Iterator[(Int, Int)]) = { | |
val (grouped, first) = edges.next() | |
val allIds = edges.foldLeft(TreeSet(grouped, first))((soFar, elem) => soFar + elem._2) | |
val smallest = allIds.head | |
val progress = allIds.size > 2 && !grouped.equals(smallest) | |
(allIds - smallest).map(elem => (smallest, elem, progress)) | |
} | |
val iterationSoFar = { | |
Tsv(input, fields = ('n1, 'n2)) | |
.flatMapTo(('n1, 'n2) -> ('b1, 'b2))(bidirectionalEdge) | |
.groupBy('b2)(_.mapStream(('b2, 'b1) -> ('node1, 'node2, 'isNew))(iterateEdges)) | |
.project(('node1, 'node2, 'isNew)) | |
} | |
iterationSoFar | |
.filter('isNew)(identity[Boolean]) | |
.write(Tsv(progressSink)) | |
iterationSoFar | |
.distinct(('node1, 'node2)) | |
.write(Tsv(sink)) | |
override def next: Option[Job] = { | |
val nextIteration = iteration + 1 | |
val nextArgs = args + ("input", Some(sink)) + | |
("output", Some(outputBaseDir + "/run-" + nextIteration)) + | |
("iteration", Some(nextIteration.toString)) | |
if(!Tsv(progressSink).readAtSubmitter[(Int, Int)].isEmpty) { | |
Some(clone(nextArgs)) | |
} else { | |
None | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest.FunSuite | |
import com.twitter.scalding.{FieldConversions, TupleConversions, Tsv, JobTest} | |
import scala.collection.mutable | |
import org.scalatest.matchers.ShouldMatchers | |
class FixedPointJobTest extends FunSuite with TupleConversions with FieldConversions with ShouldMatchers { | |
test("should work for the example in the book") { | |
val exampleGraphInput = List( | |
(1,4), (4,3), (4,5), (5,2), (5,11) | |
) | |
val graphAfterIteration1 = List( | |
(1,4), (1,3), (1,5), (3,4), (2,4), (2,5), (2, 11), (5, 11) | |
) | |
val graphAfterIteration2 = List( | |
(1,3), (1,4), (1,5), (1,11), (1,2), (2,5), (2, 4), (2,11) | |
) | |
val finalExpectedGraph = List( | |
(1,2), (1,3), (1,4), (1,5), (1,11) | |
) | |
def validateOutput(runString: String, expectedGraph: List[(Int, Int)])(buffer: mutable.Buffer[(Int, Int)]) { | |
println(runString) | |
buffer.filterNot(expectedGraph.contains).size should be(0) | |
} | |
JobTest(new FixedPointJob(_)) | |
.arg("input", "input") | |
.arg("output-base-dir", "output-base-dir") | |
.arg("progress", "progress") | |
.arg("iteration", "1") | |
.source(Tsv("input", fields = ('n1, 'n2)), exampleGraphInput) | |
.source(Tsv("output-base-dir/run-1", fields = ('n1, 'n2)), graphAfterIteration1) | |
.source(Tsv("output-base-dir/run-2", fields = ('n1, 'n2)), graphAfterIteration2) | |
.source(Tsv("output-base-dir/run-3", fields = ('n1, 'n2)), finalExpectedGraph) | |
.source(Tsv("output-base-dir/run-4", fields = ('n1, 'n2)), Iterable()) | |
.sink(Tsv("output-base-dir/run-1"))(validateOutput("Validating Run 1", graphAfterIteration1)) | |
.sink(Tsv("output-base-dir/run-2"))(validateOutput("Validating Run 2", graphAfterIteration2)) | |
.sink(Tsv("output-base-dir/run-3"))(validateOutput("Validating Run 3", finalExpectedGraph)) | |
.sink(Tsv("output-base-dir/run-4"))(validateOutput("Validating Run 4", finalExpectedGraph)) | |
.sink(Tsv("progress"))(doNothing) | |
.run | |
.finish | |
} | |
def doNothing(buffer: mutable.Buffer[(Int, Int)]) {} | |
} |
No comments:
Post a Comment