1
+ var
2
+ csv = require ( 'csv' ) ,
3
+ fs = require ( 'fs' ) ,
4
+ async = require ( 'async' ) ,
5
+ multimeter = require ( 'multimeter' ) ,
6
+ multi = multimeter ( process ) ,
7
+ sys = require ( 'sys' ) ,
8
+ exec = require ( 'child_process' ) . exec ,
9
+ inside = require ( 'point-in-polygon' ) ,
10
+ cluster = require ( 'cluster' ) ,
11
+ numCPUs = require ( 'os' ) . cpus ( ) . length ,
12
+ cursor = 0 ;
13
+
14
+ //TODO: Find a way to write Status... DONE more neatly (status updates)
15
+
16
+ multi . on ( '^C' , function ( ) {
17
+ process . exit ( 1 ) ;
18
+ } ) ;
19
+
20
+ multi . charm . reset ( ) ;
21
+
22
+ function lineCount ( filename , callback ) {
23
+ exec ( 'wc -l ' + filename + ' | cut -d " " -f 3' , function ( error , stdout , stderr ) {
24
+ callback ( error , stdout . trim ( ) ) ;
25
+ } ) ;
26
+ }
27
+
28
+ function writeStatus ( msg , pos ) {
29
+ if ( ! pos ) {
30
+ cursor ++ ;
31
+ pos = cursor ;
32
+ }
33
+ multi . charm . position ( 0 , pos ) ;
34
+ multi . charm . write ( msg ) ;
35
+ multi . charm . cursor ( false ) ;
36
+
37
+ return pos ;
38
+ }
39
+
40
+ function loadPoints ( filename , callback ) {
41
+ var msg = "Loading GeoJSON points from " + filename + "... " ;
42
+ var pos = writeStatus ( msg ) ;
43
+ fs . readFile ( filename , function ( err , data ) {
44
+ var features = JSON . parse ( data ) . features , points = [ ] ;
45
+ features . forEach ( function ( feature ) {
46
+ points . push ( feature . geometry . coordinates ) ;
47
+ } ) ;
48
+ cursor = writeStatus ( msg + "Done\n" , pos ) ;
49
+ callback ( null , points ) ;
50
+ } ) ;
51
+ }
52
+
53
+ cluster . setupMaster ( {
54
+ exec : "crime-block-worker.js" ,
55
+ silent : true
56
+ } ) ;
57
+
58
+ if ( cluster . isMaster ) {
59
+ var bar ;
60
+
61
+ async . series ( [
62
+ function loadCensusBlocks ( callback ) {
63
+ loadPoints ( 'data/source/census_blocks.json' , function ( err , points ) {
64
+ callback ( err , points ) ;
65
+ } ) ;
66
+ } ,
67
+ function loadCrimes ( callback ) {
68
+ loadPoints ( './crimes.json' , function ( err , points ) {
69
+ callback ( err , points ) ;
70
+ } ) ;
71
+ }
72
+ ] ,
73
+ function ( err , results ) {
74
+ var child , crimes = results [ 1 ] , geoms = results [ 0 ] , crime_count = crimes . length , crimes_done = 0 , crimes_per_thread = Math . round ( crime_count / numCPUs ) , pos , msg ;
75
+ msg = "Spawning " + numCPUs + " workers for processing... " ;
76
+ pos = writeStatus ( msg ) ;
77
+ for ( var i = 0 ; i < numCPUs ; i ++ ) {
78
+ child = cluster . fork ( { } ) ;
79
+ child . send ( { crimes : crimes . slice ( i * crimes_per_thread , ( i + 1 ) * crimes_per_thread ) , geoms : geoms } ) ;
80
+
81
+ child . on ( 'message' , function ( msg ) {
82
+ crimes_done += msg . crimes . length ;
83
+ bar . ratio ( crimes_done , crime_count ) ;
84
+ msg . crimes . forEach ( function ( crime ) {
85
+ console . log ( crimes [ crime ] ) ;
86
+ } ) ;
87
+ } ) ;
88
+ }
89
+ writeStatus ( msg + " Done\n" , pos ) ;
90
+ writeStatus ( "This process will take a few minutes. It starts off slow and then speeds up. Please wait.\n" ) ;
91
+
92
+ bar = multi ( 0 , cursor + 1 , { width : 80 } ) ;
93
+ bar . ratio ( 0 , crime_count ) ;
94
+
95
+ cluster . disconnect ( function ( ) {
96
+ bar . ratio ( crime_count , crime_count ) ;
97
+ writeStatus ( "\n\nProcess complete.\n" ) ;
98
+ process . exit ( 0 ) ;
99
+ } ) ;
100
+ } ) ;
101
+
102
+ process . on ( 'exit' , function ( code , signal ) {
103
+ cluster . disconnect ( ) ;
104
+ multi . charm . cursor ( true ) ;
105
+ } ) ;
106
+
107
+ }
0 commit comments