Skip to content

Commit

Permalink
HELL DAMMIT IT FINALLY WORKS AS EXPECTED
Browse files Browse the repository at this point in the history
  • Loading branch information
Jabher committed May 31, 2015
1 parent 96b852e commit 17c5ed7
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 25 deletions.
9 changes: 8 additions & 1 deletion data/queries_test.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
[
"2 tone",
"2-step garage"
"2-step garage",
"4-beat",
"A cappella",
"A-F",
"Acid house",
"Acid jazz",
"Acid rock",
"Acoustic"
]
15 changes: 12 additions & 3 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,28 @@ var remaining_thread_count = Number(process.env.THREAD_COUNT) || 150;
const tasks = {reflexGroups, reflexUsers, reflexTracks};

export async function init() {
console.log('starting on ' + new Date());
await cypher('CREATE CONSTRAINT ON (group:VK_Group) ASSERT group.id IS UNIQUE');
await cypher('CREATE CONSTRAINT ON (post:VK_Wall_Post) ASSERT post.id IS UNIQUE');
await cypher('CREATE CONSTRAINT ON (track:VK_Track) ASSERT track.id IS UNIQUE');
await cypher('CREATE CONSTRAINT ON (user:VK_User) ASSERT user.id IS UNIQUE');
await cypher('CREATE CONSTRAINT ON (artist:Artist) ASSERT artist.id IS UNIQUE');
for (let query of queries)
await addTask(['reflexGroups', query]);
while (remaining_thread_count--)
await start_thread();
/*important! not an async call, that's an fibering*/
start_thread();
}

//noinspection InfiniteRecursionJS
async function start_thread() {
try {
await (await getNextTask());
} catch (e) {
console.log(e, e.stack);
process.exit(1);
console.log(e);
console.log(e.stack);
process.nextTick(()=> process.exit(1));
throw e;
} finally {
await start_thread()
}
Expand Down
1 change: 0 additions & 1 deletion lib/reflectors/groups_reflection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import callVK from '../utils/VK_stack_caller.js';
//noinspection JSUnusedGlobalSymbols
export default async function reflexGroupsFromQuery(query) {
let {items: groups} = await callVK('groups.search', {q: query, type: 'page', count: 1000});
console.log(groups);
if (Array.isArray(groups))
await cypher(
`FOREACH (id in {ids} | MERGE (group:VK_Group {id: id}))`,
Expand Down
25 changes: 16 additions & 9 deletions lib/reflectors/tracks_reflection.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@ export default async function reflexTracksForGroup(group_id, remainingTrackCount
remainingTrackCount -= tracks.length;
remainingWallPostCount -= posts.length;
await cypher(`
MATCH (group:VK_Group {id: ${group_id} })
MERGE (post:VK_Wall_Post { id: ${post.id}, date: ${post.date} })
CREATE UNIQUE (group)-[:Published]-(post)
FOREACH (track IN {tracks} |
MERGE (trackNode:VK_Track {id: track.id, title: track.title, duration: track.duration})
MERGE (artist:Artist {name: track.artist})
CREATE UNIQUE (post)-[:Attached]->(trackNode)<-[:Performed]-(artist)
)
`,
MATCH (group:VK_Group {id: {group_id} })
MERGE (post:VK_Wall_Post { id: {post_id}})
ON CREATE SET post.date={post_date}
MERGE (group)-[:Published]->(post)
FOREACH (track IN {tracks} |
MERGE (trackNode:VK_Track {id: track.id})
ON CREATE SET trackNode.title=track.title,
trackNode.duration=track.duration
MERGE (artist:Artist {name: track.artist})
MERGE (trackNode)<-[:Performed]-(artist)
MERGE (post)-[:Attached]->(trackNode)
)`,
{
group_id,
post_id: post.id,
post_date: post.date,
tracks: tracks.map(track => ({
id : track.id,
title : track.title,
Expand Down
13 changes: 6 additions & 7 deletions lib/reflectors/users_reflection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ export default async function reflexUsersForGroup(group_id) {
do {
var {items: users} = await callVK('groups.getMembers', {group_id: group_id, count, offset: cursor++ * count});
await cypher(`
MATCH (g:VK_Group { id: ${group_id} })
FOREACH (id IN {users} |
MERGE (u:VK_User { id: id })
CREATE UNIQUE (u)-[r:Member]->(g)
)
`,
{users});
MATCH (g:VK_Group { id: {group_id} })
FOREACH (id IN {users} |
MERGE (u:VK_User { id: id })
CREATE UNIQUE (u)-[:Member]->(g)
)`,
{group_id, users});
} while (users.length !== 0)
}
6 changes: 4 additions & 2 deletions lib/utils/VK_stack_caller.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ async function loop() {
${queries.map(q => `result.push(API.${q.method}(${JSON.stringify(q.args, replace_quotes)}));`).join('\n')}
return result;`;
call('execute', {v, code})
.then(({execute_errors, error, response = []}) =>
.then(({execute_errors, error, response = []}) => {
console.log(execute_errors, error, response.length, stack.length);
stack.push(...queries.filter((query, index) => {
if (response[index] || response[index] === '')
query.resolve(response[index]);
else
return true;
})));
}))
});

await new Promise(res => setTimeout(res, 450));
await loop();
Expand Down
2 changes: 1 addition & 1 deletion lib/utils/graph_connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ export default function cypher(query, params) {
};


export function escape(string) {
export function escape(string: string) {
return '"' + string.replace(/"/g, '\\"') + '"';
}
2 changes: 1 addition & 1 deletion lib/utils/task_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const db = new Redis(process.env.REDIS_HOST);
const taskPresenceKey = 'graph_dump:tasks:presenceHash';
const taskOrderKey = 'graph_dump:tasks:presenceList';

export async function addTask(taskData){
export async function addTask(taskData: object){
if (taskData instanceof Object) {
var taskString = JSON.stringify(taskData);
var exists = await db.hsetnx(taskPresenceKey, taskString, 1);
Expand Down

0 comments on commit 17c5ed7

Please sign in to comment.