Skip to content

Commit

Permalink
performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Jabher committed May 17, 2015
1 parent 3b0affc commit 96b852e
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 65 deletions.
32 changes: 9 additions & 23 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import reflexGroups from './reflectors/groups_reflection.js';
import reflexUsers from './reflectors/users_reflection.js';
import reflexTracks from './reflectors/tracks_reflection.js';

import cypher, {escape} from './utils/DB_connection.js';

import queries from '../data/queries_test.json';
import cypher, {escape} from './utils/graph_connection.js';
import {addTask, getTask} from './utils/task_queue.js';
import queries from '../data/queries.json';

//noinspection JSUnresolvedVariable
var remaining_thread_count = Number(process.env.THREAD_COUNT) || 150;
Expand All @@ -13,10 +13,7 @@ const tasks = {reflexGroups, reflexUsers, reflexTracks};

export async function init() {
for (let query of queries)
await cypher(`
MERGE (query:Task {type: 'reflexGroups', data: ${escape(query)} })
ON CREATE SET query.ts = 0
`);
await addTask(['reflexGroups', query]);
while (remaining_thread_count--)
await start_thread();
}
Expand All @@ -26,30 +23,19 @@ async function start_thread() {
try {
await (await getNextTask());
} catch (e) {
console.log(e);
console.log(e, e.stack);
process.exit(1);
} finally {
await start_thread()
}
}

async function getNextTask() {
var [{task}] = await cypher(`
MATCH (task:Task)
WHERE task.ts < timestamp()
WITH task
ORDER BY task.ts
LIMIT 1
SET task.ts = timestamp()
RETURN task;
`);
console.log(task);
if (!task)
throw new Error('no tasks');

let taskFactory = tasks[task.properties.type];
var [type, data] = await getTask();

let taskFactory = tasks[type];
if (!taskFactory)
throw new Error('unknown task type');

return taskFactory(task.properties.data);
return taskFactory(data);
}
20 changes: 10 additions & 10 deletions lib/reflectors/groups_reflection.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import cypher, {escape} from '../utils/DB_connection.js';
import cypher, {escape} from '../utils/graph_connection.js';
import {addTask} from '../utils/task_queue.js';
import callVK from '../utils/VK_stack_caller.js';

//noinspection JSUnusedGlobalSymbols
export default async function reflexGroupsFromQuery(query) {
let {items: groups} = await callVK('groups.search', {q: query, type: 'page', count: 1000});
console.log(groups);
if (Array.isArray(groups))
for (let group of groups)
await cypher(`
MERGE (g:VK_Group { id: ${group.id} })
ON CREATE SET g.created = timestamp()
MERGE (user_task:Task {type: 'reflexUsers', data: ${group.id} })
ON CREATE SET user_task.ts = 0
MERGE (track_task:Task {type: 'reflexTracks', data: ${group.id} })
ON CREATE SET track_task.ts = 0
`);
await cypher(
`FOREACH (id in {ids} | MERGE (group:VK_Group {id: id}))`,
{ids: groups.map(group => group.id)}
);
for (let group of groups) {
addTask(['reflexUsers', group.id]);
addTask(['reflexTracks', group.id]);
}
}
35 changes: 19 additions & 16 deletions lib/reflectors/tracks_reflection.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import cypher, {escape} from '../utils/DB_connection.js';
import cypher, {escape} from '../utils/graph_connection.js';
import callVK from '../utils/VK_stack_caller.js';

const ts_iteration_key = 'tracks_reflection_ts';
Expand All @@ -14,25 +14,28 @@ export default async function reflexTracksForGroup(group_id, remainingTrackCount
if (posts.length === 0)
break;
for (let post of posts) {
let tracks = post.attachments.map(({audio}) => audio).filter(a => a);
let tracks = (post.attachments || []).map(({audio}) => audio).filter(a => a);
remainingTrackCount -= tracks.length;
remainingWallPostCount -= posts.length;
if (tracks.length > 0) {
await cypher(`
await cypher(`
MATCH (group:VK_Group {id: ${group_id} })
MERGE (post:VK_Wall_Post { id: ${post.id}, date: ${post.date} })
CREATE UNIQUE (group)-[:Published]-(post)
`);

for (let track of tracks)
await cypher(`
MATCH (post:VK_Wall_Post {id: ${post.id}})
MERGE (track:VK_Track {id: ${track.id}, title: ${escape(track.title)}, duration: ${track.duration}})
MERGE (artist:Artist {name: ${escape(track.artist)}})
CREATE UNIQUE (post)-[:Attached]->(track)<-[:Performed]-(artist)
`);
}
FOREACH (track IN {tracks} |
MERGE (trackNode:VK_Track {id: track.id, title: track.title, duration: track.duration})
MERGE (artist:Artist {name: track.artist})
CREATE UNIQUE (post)-[:Attached]->(trackNode)<-[:Performed]-(artist)
)
`,
{
tracks: tracks.map(track => ({
id : track.id,
title : track.title,
duration: track.duration,
artist : track.artist
}))
}
);
}
} while (remainingTrackCount > 0 && remainingWallPostCount > 0)
}

}
19 changes: 9 additions & 10 deletions lib/reflectors/users_reflection.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import cypher, {escape} from '../utils/DB_connection.js';
import cypher, {escape} from '../utils/graph_connection.js';
import callVK from '../utils/VK_stack_caller.js';

const count = 100;
Expand All @@ -8,14 +8,13 @@ export default async function reflexUsersForGroup(group_id) {
let cursor = 0;
do {
var {items: users} = await callVK('groups.getMembers', {group_id: group_id, count, offset: cursor++ * count});

for (let user_id of users)
await cypher(`
await cypher(`
MATCH (g:VK_Group { id: ${group_id} })
MERGE (u:VK_User { id: ${user_id} })
CREATE UNIQUE (u)-[r:Member]->(g)
`);

FOREACH (id IN {users} |
MERGE (u:VK_User { id: id })
CREATE UNIQUE (u)-[r:Member]->(g)
)
`,
{users});
} while (users.length !== 0)
}

}
8 changes: 3 additions & 5 deletions lib/utils/DB_connection.js → lib/utils/graph_connection.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import {GraphDatabase} from 'neo4j';

//noinspection JSUnresolvedVariable
var db_host = process.env.NEO4J_HOST || 'http://localhost:7474';
var db = new GraphDatabase(process.env.NEO4J_HOST || 'http://localhost:7474');

var db = new GraphDatabase(db_host);

export default function cypher(query) {
export default function cypher(query, params) {
console.log('remote:Cypher', query);
return new Promise((resolve, reject) =>
db.cypher({query}, (err, data) => err ? reject(err) : resolve(data)));
db.cypher(params ? {query, params} : {query}, (err, data) => err ? reject(err) : resolve(data)));
};


Expand Down
27 changes: 27 additions & 0 deletions lib/utils/task_queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Important notice: all this stuff is expected to improve performance on graph operations
* Previously tasks were also stored in Neo4j, but I've encountered significant lack of speed
* */

import Redis from 'ioredis';
const db = new Redis(process.env.REDIS_HOST);

const taskPresenceKey = 'graph_dump:tasks:presenceHash';
const taskOrderKey = 'graph_dump:tasks:presenceList';

export async function addTask(taskData){
if (taskData instanceof Object) {
var taskString = JSON.stringify(taskData);
var exists = await db.hsetnx(taskPresenceKey, taskString, 1);
if (!exists) {
await db.rpush(taskOrderKey, taskString);
}
} else {
throw new TypeError;
}
}

export async function getTask(){
var task = await db.rpoplpush(taskOrderKey, taskOrderKey);
return JSON.parse(task);
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"license": "Apache v2",
"dependencies": {
"babel": "^5.2.17",
"ioredis": "^1.2.5",
"ioredis": "^1.3.1",
"neo4j": "^2.0.0-RC1",
"node-fetch": "^1.2.1"
}
Expand Down

0 comments on commit 96b852e

Please sign in to comment.