Skip to content

Custom Storage

Matt Bossenbroek edited this page Dec 10, 2015 · 1 revision

If you have an existing storage format you want to use, here's how to use it with PigPen.

For reading data, see: Custom Loaders

Similar to a custom loader, we're going to wrap our custom storage as a new PigPen command. Here's an example:

(defn store-foo
  ([location relation]
    (store-foo location {} relation))
  ([location opts relation]
    (->> relation
      (raw/bind$ ;; all of the pigpen commands work with a single binary field
                 ;; called 'value. The bind command changes that value into raw
                 ;; types that your storage can consume. If you just want the
                 ;; clojure data, just use '(pigpen.core.op/map->bind identity)
        '[pigpen.core.op]  ;; require this namespace at runtime
        '(pigpen.core.op/map->bind pr-str) ;; This function takes the single value
                                           ;; and converts it into potentially
                                           ;; multiple fields.
                                           ;; See http://netflix.github.io/PigPen/pigpen.core.op.html#var-bind
        {:field-type :native})  ;; This tells bind not to serialize the output
      (raw/store$             ;; This is the raw store command
        location              ;; The location to write to (must be a string)
        :my-custom-storage    ;; This indicates the type of storage to use
        opts))))              ;; Any options to pass to your storage

This command does two things. We prepare our data by converting a serialized value into something the storage can write, and then call the custom store command.

The next step is to provide a local implementation for our custom storage. This step is only required if you want to use the new loader locally (in the REPL). We do this by implementing the following multimethod:

(require '[pigpen.local :refer [PigPenLocalStorage]])

(s/defmethod store :my-custom-storage
  [{:keys [location]} :- m/Store]
  (reify PigPenLocalStorage
    (init-writer [_]
      ;; initialize the writer using `location`
      )
    (write [_ writer value]
      ;; use `writer` to write a single value
      )
    (close-writer [_ writer]
      ;; close the writer created by `init-writer`
      )))

After that, we can provide a Pig script implementation of our storage, using the opts passed earlier if needed:

(require '[pigpen.pig.script])

(defmethod pigpen.pig.script/storage->script [:store :my-custom-storage]
  [{:keys [opts]}]
  "my.custom.Storage()")

If you need pig to load any additional jars, you can specify them with this multimethod:

(require '[pigpen.pig.oven])

(defmethod pigpen.pig.oven/storage->references :my-custom-storage
  [_]
  ["storage.jar"])

And we're done! Now we can use our new command just like any other:

(require '[pigpen.core :as pig])

(defn use-my-storage []
  (->>
    (pig/load-tsv "input.tsv")
    (pig/map count)
    (pig/store-foo "output.foo")))