-
Notifications
You must be signed in to change notification settings - Fork 8
/
db_upload.R
121 lines (91 loc) · 3.56 KB
/
db_upload.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
library(tidyverse)
library(dbplyr)
library(lubridate)
filename <- "cps_00048.csv" # insert name of file to be read
# Connect to DB
# Run in console: cloud_sql_proxy.exe -instances=nytint-stg:us-east1:stg-mysql-mhvl=tcp:3306
# https://console.cloud.google.com/sql/instances/stg-mysql-mhvl/overview?project=nytint-stg&authuser=1&duration=PT1H
cps_con <- src_mysql(dbname = "cass_cps_microdata",
host = "127.0.0.1",
user = user,
password = password)
# First time setup:
# Have to read in chuks using 'chunkwise' and save in mysql DB.
library(chunked)
R.utils::gunzip(paste0(filename, ".gz"))
cps_data <- read_csv_chunkwise(file = filename) %>%
mutate(date = paste0(YEAR, "-", MONTH, "-01"))
#
# # Now read into sqplite DB
insert_chunkwise_into(cps_data, cps_con, "cps_main", temporary = FALSE)
# # From now on should be able to connect via dbplyr
cps <- cps_con %>%
tbl("cps_main")
# Going to create flows via loop
# will feed into table "flows"
# Get first month manually
# Pull two consecutive months of data
w <- cps %>%
filter(YEAR == 1994, MONTH %in% c(1,2), WTFINL > 0) %>%
collect()
# Split by months and merge
flow <- w %>% filter(MONTH == 1)
flow <- w %>% filter(MONTH == 2) %>%
inner_join(flow, ., by = "CPSIDP")
# Eliminate mismatches
flow <- flow %>% filter(SEX.x == SEX.y, RACE.x == RACE.y, AGE.y >= AGE.x, AGE.y < (AGE.x +3))
# Adjust weights
adj <- 1/(sum(flow$WTFINL.y)/w %>% filter(MONTH == 2) %>% summarize(sum(WTFINL)) %>% as.numeric())
flow <- flow %>% mutate(flow_weight = WTFINL.y * adj)
# Function for IDing LF status
status_gen <- function(x) {
sapply(x, function(x) {
if (x %in% c(10, 12)) {status <- "E"} else
if (x %in% c(20, 21, 22)) {status <- "U"} else
if (x %in% (30:36)) { status <- "N"} else {status <- NA}
status})
}
flow <- flow %>%
mutate(m1 = status_gen(EMPSTAT.x),
m2 = status_gen(EMPSTAT.y),
flow = paste0(m1, m2))
flow %>% group_by(flow) %>%
summarize(total = sum(flow_weight))
con <- dbConnect(RMySQL::MySQL(),
dbname = "cass_cps_microdata",
host = "127.0.0.1",
user = user,
password = password)
dbWriteTable(con, "flow", flow)
# OK, now do this for rest of months via for loop
months <- seq.Date(ymd("1994-02-01"),ymd("2018-12-01"), by = "month")
for (i in 1:length(months)) {
date1 <- months[i]
date2 <- months[i+1]
dates <- c(paste0(year(date1), "-", as.numeric(month(date1)), "-01"),
paste0(year(date2), "-", as.numeric(month(date2)), "-01"))
w <- cps %>%
filter((MONTH == month(date1) & YEAR == year(date1)) | (MONTH == month(date2) & YEAR == year(date2)),
WTFINL > 0) %>%
collect()
# Split by months and merge
flow <- w %>% filter(MONTH == month(date1), YEAR == year(date1))
flow <- w %>% filter(MONTH == month(date2), YEAR == year(date2)) %>%
inner_join(flow, ., by = "CPSIDP")
if (nrow(flow) == 0) {
print(paste0("FAIL: ", date1))
next} else
# Eliminate mismatches
flow <- flow %>% filter(SEX.x == SEX.y, RACE.x == RACE.y, AGE.y >= AGE.x, AGE.y < (AGE.x +3))
# Adjust weights
adj <- 1/(sum(flow$WTFINL.y)/w %>% filter(MONTH == month(date2)) %>% summarize(sum(WTFINL)) %>% as.numeric())
rm(w)
flow <- flow %>% mutate(flow_weight = WTFINL.y * adj)
flow <- flow %>% rowwise() %>%
mutate(m1 = status_gen(EMPSTAT.x),
m2 = status_gen(EMPSTAT.y),
flow = paste0(m1, m2))
dbWriteTable(con, "flow", flow, append = TRUE)
rm(flow)
print(months[i])
}