diff --git a/api/api.go b/api/api.go index fa6ad4f23a..57633cbbef 100644 --- a/api/api.go +++ b/api/api.go @@ -21,6 +21,7 @@ package api import ( "archive/tar" + "bytes" "context" "crypto/ecdsa" "encoding/hex" @@ -28,13 +29,11 @@ import ( "fmt" "io" "math/big" + "mime" "net/http" "path" - "strings" - - "bytes" - "mime" "path/filepath" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -47,8 +46,7 @@ import ( "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/storage/feed" "github.com/ethersphere/swarm/storage/feed/lookup" - - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" ) var ( @@ -400,6 +398,8 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage return } +// Delete handles removing a file from the manifest. +// This creates a new manifest without the given path func (a *API) Delete(ctx context.Context, addr string, path string) (storage.Address, error) { apiDeleteCount.Inc(1) uri, err := Parse("bzz:/" + addr) diff --git a/api/client/client.go b/api/client/client.go index ef0ac0cbeb..cba4c8c2be 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -63,7 +63,7 @@ type Client struct { // UploadRaw uploads raw data to swarm and returns the resulting hash. If toEncrypt is true it // uploads encrypted data -func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, error) { +func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool, toPin bool) (string, error) { if size <= 0 { return "", errors.New("data size must be greater than zero") } @@ -78,6 +78,11 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err req.ContentLength = size req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix())) + // Set the pinning header if the file needs to be pinned + if toPin { + req.Header.Set(swarmhttp.PinHeaderName, "true") + } + res, err := http.DefaultClient.Do(req) if err != nil { return "", err @@ -151,11 +156,11 @@ func Open(path string) (*File, error) { // (if the manifest argument is non-empty) or creates a new manifest containing // the file, returning the resulting manifest hash (the file will then be // available at bzz://) -func (c *Client) Upload(file *File, manifest string, toEncrypt bool) (string, error) { +func (c *Client) Upload(file *File, manifest string, toEncrypt bool, toPin bool) (string, error) { if file.Size <= 0 { return "", errors.New("file size must be greater than zero") } - return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt) + return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin) } // Download downloads a file with the given path from the swarm manifest with @@ -185,7 +190,7 @@ func (c *Client) Download(hash, path string) (*File, error) { // directory will then be available at bzz://path/to/file), with // the file specified in defaultPath being uploaded to the root of the manifest // (i.e. bzz://) -func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt bool) (string, error) { +func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt bool, toPin bool) (string, error) { stat, err := os.Stat(dir) if err != nil { return "", err @@ -200,7 +205,7 @@ func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt bo return "", fmt.Errorf("default path: %v", err) } } - return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt) + return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin) } // DownloadDirectory downloads the files contained in a swarm manifest under @@ -358,12 +363,13 @@ func (c *Client) DownloadFile(hash, path, dest, credentials string) error { } // UploadManifest uploads the given manifest to swarm -func (c *Client) UploadManifest(m *api.Manifest, toEncrypt bool) (string, error) { +func (c *Client) UploadManifest(m *api.Manifest, toEncrypt bool, toPin bool) (string, error) { data, err := json.Marshal(m) if err != nil { return "", err } - return c.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt) + + return c.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt, toPin) } // DownloadManifest downloads a swarm manifest @@ -498,7 +504,7 @@ type UploadFn func(file *File) error // TarUpload uses the given Uploader to upload files to swarm as a tar stream, // returning the resulting manifest hash -func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) { +func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool, toPin bool) (string, error) { ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload") defer sp.Finish() @@ -540,6 +546,11 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t req.Header.Set(swarmhttp.SwarmTagHeaderName, tag) + // Set the pinning header if the file is to be pinned + if toPin { + req.Header.Set(swarmhttp.PinHeaderName, "true") + } + // use 'Expect: 100-continue' so we don't send the request body if // the server refuses the request req.Header.Set("Expect", "100-continue") @@ -591,7 +602,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t // MultipartUpload uses the given Uploader to upload files to swarm as a // multipart form, returning the resulting manifest hash -func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error) { +func (c *Client) MultipartUpload(hash string, uploader Uploader, toPin bool) (string, error) { reqR, reqW := io.Pipe() defer reqR.Close() req, err := http.NewRequest("POST", c.Gateway+"/bzz:/"+hash, reqR) @@ -606,6 +617,9 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error) mw := multipart.NewWriter(reqW) req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary())) req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix())) + if toPin { + req.Header.Set(swarmhttp.PinHeaderName, "true") + } // define an UploadFn which adds files to the multipart form uploadFn := func(file *File) error { diff --git a/api/client/client_test.go b/api/client/client_test.go index d4dddb04bb..1f2eb64864 100644 --- a/api/client/client_test.go +++ b/api/client/client_test.go @@ -36,39 +36,47 @@ import ( ) func serverFunc(api *api.API) swarmhttp.TestServer { - return swarmhttp.NewServer(api, "") + return swarmhttp.NewServer(api, nil, "") } // TestClientUploadDownloadRaw test uploading and downloading raw data to swarm func TestClientUploadDownloadRaw(t *testing.T) { - testClientUploadDownloadRaw(false, t) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) + defer srv.Close() + + data := []byte("foo123") + testClientUploadDownloadRaw(srv, false, t, data, false) + + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 1, 1, 0, 1) } func TestClientUploadDownloadRawEncrypted(t *testing.T) { + if testutil.RaceEnabled { t.Skip("flaky with -race on Travis") // See: https://github.com/ethersphere/go-ethereum/issues/1254 } - testClientUploadDownloadRaw(true, t) -} - -func testClientUploadDownloadRaw(toEncrypt bool, t *testing.T) { - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() - client := NewClient(srv.URL) - - // upload some raw data data := []byte("foo123") - hash, err := client.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt) - if err != nil { - t.Fatal(err) - } + testClientUploadDownloadRaw(srv, true, t, data, false) // check the tag was created successfully tag := srv.Tags.All()[0] testutil.CheckTag(t, tag, 1, 1, 0, 1) +} + +func testClientUploadDownloadRaw(srv *swarmhttp.TestSwarmServer, toEncrypt bool, t *testing.T, data []byte, toPin bool) string { + client := NewClient(srv.URL) + + hash, err := client.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt, toPin) + if err != nil { + t.Fatal(err) + } // check we can download the same data res, isEncrypted, err := client.DownloadRaw(hash) @@ -86,6 +94,8 @@ func testClientUploadDownloadRaw(toEncrypt bool, t *testing.T) { if !bytes.Equal(gotData, data) { t.Fatalf("expected downloaded data to be %q, got %q", data, gotData) } + + return hash } // TestClientUploadDownloadFiles test uploading and downloading files to swarm @@ -98,12 +108,13 @@ func TestClientUploadDownloadFilesEncrypted(t *testing.T) { testClientUploadDownloadFiles(true, t) } -func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) { - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) +func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) string { + + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() client := NewClient(srv.URL) - upload := func(manifest, path string, data []byte) string { + upload := func(manifest, path string, data []byte, toPin bool) string { file := &File{ ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), ManifestEntry: api.ManifestEntry{ @@ -112,7 +123,7 @@ func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) { Size: int64(len(data)), }, } - hash, err := client.Upload(file, manifest, toEncrypt) + hash, err := client.Upload(file, manifest, toEncrypt, toPin) if err != nil { t.Fatal(err) } @@ -141,25 +152,27 @@ func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) { // upload a file to the root of a manifest rootData := []byte("some-data") - rootHash := upload("", "", rootData) + rootHash := upload("", "", rootData, false) // check we can download the root file checkDownload(rootHash, "", rootData) // upload another file to the same manifest otherData := []byte("some-other-data") - newHash := upload(rootHash, "some/other/path", otherData) + newHash := upload(rootHash, "some/other/path", otherData, false) // check we can download both files from the new manifest checkDownload(newHash, "", rootData) checkDownload(newHash, "some/other/path", otherData) // replace the root file with different data - newHash = upload(newHash, "", otherData) + newHash = upload(newHash, "", otherData, false) // check both files have the other data checkDownload(newHash, "", otherData) checkDownload(newHash, "some/other/path", otherData) + + return newHash } var testDirFiles = []string{ @@ -194,10 +207,10 @@ func newTestDirectory(t *testing.T) string { return dir } -// TestClientUploadDownloadDirectory tests uploading and downloading a +// TestClientUploadDownloadDirectory tests uploading and downloading // directory of files to a swarm manifest func TestClientUploadDownloadDirectory(t *testing.T) { - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() dir := newTestDirectory(t) @@ -206,7 +219,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) { // upload the directory client := NewClient(srv.URL) defaultPath := testDirFiles[0] - hash, err := client.UploadDirectory(dir, defaultPath, "", false) + hash, err := client.UploadDirectory(dir, defaultPath, "", false, false) if err != nil { t.Fatalf("error uploading directory: %s", err) } @@ -267,14 +280,14 @@ func TestClientFileListEncrypted(t *testing.T) { } func testClientFileList(toEncrypt bool, t *testing.T) { - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() dir := newTestDirectory(t) defer os.RemoveAll(dir) client := NewClient(srv.URL) - hash, err := client.UploadDirectory(dir, "", "", toEncrypt) + hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false) if err != nil { t.Fatalf("error uploading directory: %s", err) } @@ -325,14 +338,14 @@ func testClientFileList(toEncrypt bool, t *testing.T) { // TestClientMultipartUpload tests uploading files to swarm using a multipart // upload func TestClientMultipartUpload(t *testing.T) { - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() // define an uploader which uploads testDirFiles with some data // note: this test should result in SEEN chunks. assert accordingly - data := []byte("some-data") uploader := UploaderFunc(func(upload UploadFn) error { for _, name := range testDirFiles { + data := []byte(name) file := &File{ ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), ManifestEntry: api.ManifestEntry{ @@ -350,14 +363,14 @@ func TestClientMultipartUpload(t *testing.T) { // upload the files as a multipart upload client := NewClient(srv.URL) - hash, err := client.MultipartUpload("", uploader) + hash, err := client.MultipartUpload("", uploader, false) if err != nil { t.Fatal(err) } // check the tag was created successfully tag := srv.Tags.All()[0] - testutil.CheckTag(t, tag, 9, 9, 7, 9) + testutil.CheckTag(t, tag, 9, 9, 0, 9) // check we can download the individual files checkDownloadFile := func(path string) { @@ -370,8 +383,9 @@ func TestClientMultipartUpload(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(gotData, data) { - t.Fatalf("expected data to be %q, got %q", data, gotData) + // The content is the file name just to make them different + if !bytes.Equal(gotData, []byte(path)) { + t.Fatalf("expected data to be %q, got %q", path, gotData) } } for _, file := range testDirFiles { @@ -398,7 +412,7 @@ func TestClientBzzWithFeed(t *testing.T) { signer, _ := newTestSigner() // Initialize a Swarm test server - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) swarmClient := NewClient(srv.URL) defer srv.Close() @@ -433,7 +447,7 @@ func TestClientBzzWithFeed(t *testing.T) { } // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - manifestAddressHex, err := swarmClient.Upload(f, "", false) + manifestAddressHex, err := swarmClient.Upload(f, "", false, false) if err != nil { t.Fatalf("Error creating manifest: %s", err) } @@ -516,7 +530,7 @@ func TestClientCreateUpdateFeed(t *testing.T) { signer, _ := newTestSigner() - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) client := NewClient(srv.URL) defer srv.Close() diff --git a/api/http/response_test.go b/api/http/response_test.go index 486c19ab0e..431f08f86f 100644 --- a/api/http/response_test.go +++ b/api/http/response_test.go @@ -27,7 +27,7 @@ import ( ) func TestError(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() var resp *http.Response @@ -53,7 +53,7 @@ func TestError(t *testing.T) { } func Test404Page(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() var resp *http.Response @@ -79,7 +79,7 @@ func Test404Page(t *testing.T) { } func Test500Page(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() var resp *http.Response @@ -104,7 +104,7 @@ func Test500Page(t *testing.T) { } } func Test500PageWith0xHashPrefix(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() var resp *http.Response @@ -134,7 +134,7 @@ func Test500PageWith0xHashPrefix(t *testing.T) { } func TestJsonResponse(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() var resp *http.Response diff --git a/api/http/server.go b/api/http/server.go index e13fb11b42..5617fddeec 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -44,6 +44,7 @@ import ( "github.com/ethersphere/swarm/sctx" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/storage/feed" + "github.com/ethersphere/swarm/storage/pin" "github.com/rs/cors" ) @@ -63,7 +64,10 @@ var ( getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil) ) -const SwarmTagHeaderName = "x-swarm-tag" +const ( + SwarmTagHeaderName = "x-swarm-tag" // Presence of this in header indicates the tag + PinHeaderName = "x-swarm-pin" // Presence of this in header indicates pinning required +) type methodHandler map[string]http.Handler @@ -76,7 +80,7 @@ func (m methodHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusMethodNotAllowed) } -func NewServer(api *api.API, corsString string) *Server { +func NewServer(api *api.API, pinAPI *pin.API, corsString string) *Server { var allowedOrigins []string for _, domain := range strings.Split(corsString, ",") { allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain)) @@ -88,7 +92,7 @@ func NewServer(api *api.API, corsString string) *Server { AllowedHeaders: []string{"*"}, }) - server := &Server{api: api} + server := &Server{api: api, pinAPI: pinAPI} defaultMiddlewares := []Adapter{ RecoverPanic, @@ -183,6 +187,7 @@ func (s *Server) ListenAndServe(addr string) error { type Server struct { http.Handler api *api.API + pinAPI *pin.API listenAddr string } @@ -255,6 +260,9 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { toEncrypt = true } + // Set the pinCounter if there is a pin header present in the request + headerPin := r.Header.Get(PinHeaderName) + if uri.Path != "" { postRawFail.Inc(1) respondError(w, r, "raw POST request cannot contain a path", http.StatusBadRequest) @@ -285,6 +293,16 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { log.Debug("stored content", "ruid", ruid, "key", addr) + // Add the root hash of the RAW file in the pinFilesIndex + if strings.ToLower(headerPin) == "true" { + err = s.pinAPI.PinFiles(addr, true, "") + if err != nil { + postRawFail.Inc(1) + respondError(w, r, fmt.Sprintf("Error pinning file : %s", addr.Hex()), http.StatusInternalServerError) + return + } + } + w.Header().Set("Content-Type", "text/plain") w.WriteHeader(http.StatusOK) fmt.Fprint(w, addr) @@ -313,6 +331,9 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { toEncrypt = true } + // Set the pinCounter if there is a pin header present in the request + headerPin := r.Header.Get(PinHeaderName) + var addr storage.Address if uri.Addr != "" && uri.Addr != "encrypt" { addr, err = s.api.Resolve(r.Context(), uri.Addr) @@ -362,6 +383,16 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total()) tag.DoneSplit(newAddr) + // Pin the file + if strings.ToLower(headerPin) == "true" { + err = s.pinAPI.PinFiles(newAddr, false, "") + if err != nil { + postFilesFail.Inc(1) + respondError(w, r, fmt.Sprintf("Error pinning file : %s", newAddr.Hex()), http.StatusInternalServerError) + return + } + } + log.Debug("stored content", "ruid", ruid, "key", newAddr) w.Header().Set("Content-Type", "text/plain") @@ -464,6 +495,7 @@ func (s *Server) HandleDelete(w http.ResponseWriter, r *http.Request) { uri := GetURI(r.Context()) log.Debug("handle.delete", "ruid", ruid) deleteCount.Inc(1) + newKey, err := s.api.Delete(r.Context(), uri.Addr, uri.Path) if err != nil { deleteFail.Inc(1) diff --git a/api/http/server_test.go b/api/http/server_test.go index 3f0a868047..6ec917025f 100644 --- a/api/http/server_test.go +++ b/api/http/server_test.go @@ -37,8 +37,6 @@ import ( "testing" "time" - "github.com/ethersphere/swarm/storage/feed/lookup" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -46,6 +44,7 @@ import ( "github.com/ethersphere/swarm/api" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/storage/feed" + "github.com/ethersphere/swarm/storage/feed/lookup" "github.com/ethersphere/swarm/testutil" ) @@ -56,7 +55,7 @@ func init() { } func serverFunc(api *api.API) TestServer { - return NewServer(api, "") + return NewServer(api, nil, "") } func newTestSigner() (*feed.GenericSigner, error) { @@ -78,7 +77,7 @@ func TestBzzWithFeed(t *testing.T) { signer, _ := newTestSigner() // Initialize Swarm test server - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() // put together some data for our test: @@ -196,7 +195,7 @@ func TestBzzWithFeed(t *testing.T) { // Test Swarm feeds using the raw update methods func TestBzzFeed(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) signer, _ := newTestSigner() defer srv.Close() @@ -473,7 +472,7 @@ func testBzzGetPath(encrypted bool, t *testing.T) { addr := [3]storage.Address{} - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() for i, mf := range testmanifest { @@ -711,7 +710,7 @@ func TestBzzTar(t *testing.T) { } func testBzzTar(encrypted bool, t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() fileNames := []string{"tmp1.txt", "tmp2.lock", "tmp3.rtf"} fileContents := []string{"tmp1textfilevalue", "tmp2lockfilelocked", "tmp3isjustaplaintextfile"} @@ -848,7 +847,7 @@ func testBzzTar(encrypted bool, t *testing.T) { // by chunk size (4096). It is needed to be checked BEFORE chunking is done, therefore // concurrency was introduced to slow down the HTTP request func TestBzzCorrectTagEstimate(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() for _, v := range []struct { @@ -921,7 +920,7 @@ func TestBzzRootRedirectEncrypted(t *testing.T) { } func testBzzRootRedirect(toEncrypt bool, t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() // create a manifest with some data at the root path @@ -968,7 +967,7 @@ func testBzzRootRedirect(toEncrypt bool, t *testing.T) { } func TestMethodsNotAllowed(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() databytes := "bar" for _, c := range []struct { @@ -1027,7 +1026,7 @@ func httpDo(httpMethod string, url string, reqBody io.Reader, headers map[string } func TestGet(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() for _, testCase := range []struct { @@ -1110,7 +1109,7 @@ func TestGet(t *testing.T) { } func TestModify(t *testing.T) { - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() headers := map[string]string{"Content-Type": "text/plain"} res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader([]byte("data")), headers, false, t) @@ -1200,7 +1199,7 @@ func TestMultiPartUpload(t *testing.T) { // POST /bzz:/ Content-Type: multipart/form-data verbose := false // Setup Swarm - srv := NewTestSwarmServer(t, serverFunc, nil) + srv := NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() url := fmt.Sprintf("%s/bzz:/", srv.URL) @@ -1231,7 +1230,7 @@ func TestMultiPartUpload(t *testing.T) { // TestBzzGetFileWithResolver tests fetching a file using a mocked ENS resolver func TestBzzGetFileWithResolver(t *testing.T) { resolver := newTestResolveValidator("") - srv := NewTestSwarmServer(t, serverFunc, resolver) + srv := NewTestSwarmServer(t, serverFunc, resolver, nil) defer srv.Close() fileNames := []string{"dir1/tmp1.txt", "dir2/tmp2.lock", "dir3/tmp3.rtf"} fileContents := []string{"tmp1textfilevalue", "tmp2lockfilelocked", "tmp3isjustaplaintextfile"} diff --git a/api/http/test_server.go b/api/http/test_server.go index d6c93188d9..6bead92675 100644 --- a/api/http/test_server.go +++ b/api/http/test_server.go @@ -34,12 +34,15 @@ type TestServer interface { ServeHTTP(http.ResponseWriter, *http.Request) } -func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, resolver api.Resolver) *TestSwarmServer { +func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, resolver api.Resolver, + o *localstore.Options) *TestSwarmServer { + swarmDir, err := ioutil.TempDir("", "swarm-storage-test") if err != nil { t.Fatal(err) } - localStore, err := localstore.New(swarmDir, make([]byte, 32), nil) + + localStore, err := localstore.New(swarmDir, make([]byte, 32), o) if err != nil { os.RemoveAll(swarmDir) t.Fatal(err) diff --git a/api/manifest.go b/api/manifest.go index 952899f339..33ca3038b6 100644 --- a/api/manifest.go +++ b/api/manifest.go @@ -22,16 +22,16 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "strings" "time" - "github.com/ethersphere/swarm/storage/feed" - "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/feed" ) const ( @@ -72,6 +72,7 @@ func (a *API) NewManifest(ctx context.Context, toEncrypt bool) (storage.Address, if err != nil { return nil, err } + addr, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), toEncrypt) if err != nil { return nil, err @@ -93,6 +94,7 @@ func (a *API) NewFeedManifest(ctx context.Context, feed *feed.Feed) (storage.Add if err != nil { return nil, err } + addr, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), false) if err != nil { return nil, err diff --git a/chunk/chunk.go b/chunk/chunk.go index c44292bb92..3c4a235654 100644 --- a/chunk/chunk.go +++ b/chunk/chunk.go @@ -38,11 +38,14 @@ var ( type Chunk interface { Address() Address Data() []byte + PinCounter() uint64 + WithPinCounter(p uint64) Chunk } type chunk struct { - addr Address - sdata []byte + addr Address + sdata []byte + pinCounter uint64 } func NewChunk(addr Address, data []byte) Chunk { @@ -52,6 +55,11 @@ func NewChunk(addr Address, data []byte) Chunk { } } +func (c *chunk) WithPinCounter(p uint64) Chunk { + c.pinCounter = p + return c +} + func (c *chunk) Address() Address { return c.addr } @@ -60,6 +68,10 @@ func (c *chunk) Data() []byte { return c.sdata } +func (c *chunk) PinCounter() uint64 { + return c.pinCounter +} + func (self *chunk) String() string { return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata)) } @@ -136,6 +148,8 @@ func (m ModeGet) String() string { return "Sync" case ModeGetLookup: return "Lookup" + case ModeGetPin: + return "PinLookup" default: return "Unknown" } @@ -149,6 +163,8 @@ const ( ModeGetSync // ModeGetLookup: when accessed to lookup a a chunk in feeds or other places ModeGetLookup + // ModeGetPin: used when a pinned chunk is accessed + ModeGetPin ) // ModePut enumerates different Putter modes. @@ -188,6 +204,10 @@ func (m ModeSet) String() string { return "Sync" case ModeSetRemove: return "Remove" + case ModeSetPin: + return "ModeSetPin" + case ModeSetUnpin: + return "ModeSetUnpin" default: return "Unknown" } @@ -201,6 +221,10 @@ const ( ModeSetSync // ModeSetRemove: when a chunk is removed ModeSetRemove + // ModeSetPin: when a chunk is pinned during upload or separately + ModeSetPin + // ModeSetUnpin: when a chunk is unpinned using a command locally + ModeSetUnpin ) // Descriptor holds information required for Pull syncing. This struct diff --git a/cmd/swarm-smoke/upload_and_sync.go b/cmd/swarm-smoke/upload_and_sync.go index 818ebf53c2..40f43e21d7 100644 --- a/cmd/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm-smoke/upload_and_sync.go @@ -265,7 +265,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { defer cleanup() reader := bytes.NewReader(testData) - return fileStore.GetAllReferences(context.Background(), reader, false) + return fileStore.GetAllReferences(context.Background(), reader) } func uploadAndSync(c *cli.Context, randomBytes []byte) error { diff --git a/cmd/swarm-smoke/util.go b/cmd/swarm-smoke/util.go index 99f4aa5a4e..6cba20789c 100644 --- a/cmd/swarm-smoke/util.go +++ b/cmd/swarm-smoke/util.go @@ -212,7 +212,7 @@ func uploadWithTag(data []byte, endpoint string, tag string) (string, error) { Tag: tag, } - return swarm.TarUpload("", &client.FileUploader{f}, "", false) + return swarm.TarUpload("", &client.FileUploader{f}, "", false, false) } func digest(r io.Reader) ([]byte, error) { diff --git a/cmd/swarm/access.go b/cmd/swarm/access.go index 735b1d7c5b..d723a4a16e 100644 --- a/cmd/swarm/access.go +++ b/cmd/swarm/access.go @@ -51,6 +51,7 @@ var ( Flags: []cli.Flag{ utils.PasswordFileFlag, SwarmDryRunFlag, + SwarmPinFlag, }, Name: "pass", Usage: "encrypts a reference with a password and embeds it into a root manifest", @@ -64,6 +65,7 @@ var ( utils.PasswordFileFlag, SwarmDryRunFlag, SwarmAccessGrantKeyFlag, + SwarmPinFlag, }, Name: "pk", Usage: "encrypts a reference with the node's private key and a given grantee's public key and embeds it into a root manifest", @@ -77,6 +79,7 @@ var ( SwarmAccessGrantKeysFlag, SwarmDryRunFlag, utils.PasswordFileFlag, + SwarmPinFlag, }, Name: "act", Usage: "encrypts a reference with the node's private key and a given grantee's public key and embeds it into a root manifest", @@ -108,6 +111,7 @@ func accessNewPass(ctx *cli.Context) { ref = args[0] password = getPassPhrase("", false, 0, makePasswordList(ctx)) dryRun = ctx.Bool(SwarmDryRunFlag.Name) + toPin = ctx.Bool(SwarmPinFlag.Name) ) accessKey, ae, err = api.DoPassword(ctx, password, salt) if err != nil { @@ -123,7 +127,7 @@ func accessNewPass(ctx *cli.Context) { utils.Fatalf("had an error printing the manifests: %v", err) } } else { - err = uploadManifests(ctx, m, nil) + err = uploadManifests(ctx, m, nil, toPin) if err != nil { utils.Fatalf("had an error uploading the manifests: %v", err) } @@ -144,6 +148,7 @@ func accessNewPK(ctx *cli.Context) { privateKey = getPrivKey(ctx) granteePublicKey = ctx.String(SwarmAccessGrantKeyFlag.Name) dryRun = ctx.Bool(SwarmDryRunFlag.Name) + toPin = ctx.Bool(SwarmPinFlag.Name) ) sessionKey, ae, err = api.DoPK(ctx, privateKey, granteePublicKey, salt) if err != nil { @@ -159,7 +164,7 @@ func accessNewPK(ctx *cli.Context) { utils.Fatalf("had an error printing the manifests: %v", err) } } else { - err = uploadManifests(ctx, m, nil) + err = uploadManifests(ctx, m, nil, toPin) if err != nil { utils.Fatalf("had an error uploading the manifests: %v", err) } @@ -184,6 +189,7 @@ func accessNewACT(ctx *cli.Context) { passGranteesFilename = ctx.String(utils.PasswordFileFlag.Name) privateKey = getPrivKey(ctx) dryRun = ctx.Bool(SwarmDryRunFlag.Name) + toPin = ctx.Bool(SwarmPinFlag.Name) ) if pkGranteesFilename == "" && passGranteesFilename == "" { utils.Fatalf("you have to provide either a grantee public-keys file or an encryption passwords file (or both)") @@ -223,7 +229,7 @@ func accessNewACT(ctx *cli.Context) { utils.Fatalf("had an error printing the manifests: %v", err) } } else { - err = uploadManifests(ctx, m, actManifest) + err = uploadManifests(ctx, m, actManifest, toPin) if err != nil { utils.Fatalf("had an error uploading the manifests: %v", err) } @@ -247,7 +253,7 @@ func printManifests(rootAccessManifest, actManifest *api.Manifest) error { return nil } -func uploadManifests(ctx *cli.Context, rootAccessManifest, actManifest *api.Manifest) error { +func uploadManifests(ctx *cli.Context, rootAccessManifest, actManifest *api.Manifest, toPin bool) error { bzzapi := strings.TrimRight(ctx.GlobalString(SwarmApiFlag.Name), "/") client := client.NewClient(bzzapi) @@ -256,14 +262,14 @@ func uploadManifests(ctx *cli.Context, rootAccessManifest, actManifest *api.Mani err error ) if actManifest != nil { - key, err = client.UploadManifest(actManifest, false) + key, err = client.UploadManifest(actManifest, false, toPin) if err != nil { return err } rootAccessManifest.Entries[0].Access.Act = key } - key, err = client.UploadManifest(rootAccessManifest, false) + key, err = client.UploadManifest(rootAccessManifest, false, toPin) if err != nil { return err } diff --git a/cmd/swarm/access_test.go b/cmd/swarm/access_test.go index 67eef7bf01..d07e0cf335 100644 --- a/cmd/swarm/access_test.go +++ b/cmd/swarm/access_test.go @@ -161,7 +161,7 @@ func testPassword(t *testing.T, cluster *testCluster) { client := swarmapi.NewClient(cluster.Nodes[0].URL) - hash, err := client.UploadManifest(&m, false) + hash, err := client.UploadManifest(&m, false, false) if err != nil { t.Fatal(err) } @@ -337,7 +337,7 @@ func testPK(t *testing.T, cluster *testCluster) { } client := swarmapi.NewClient(cluster.Nodes[0].URL) - hash, err := client.UploadManifest(&m, false) + hash, err := client.UploadManifest(&m, false, false) if err != nil { t.Fatal(err) } diff --git a/cmd/swarm/explore.go b/cmd/swarm/explore.go index 0c3714fa71..7e7fd6757e 100644 --- a/cmd/swarm/explore.go +++ b/cmd/swarm/explore.go @@ -49,7 +49,7 @@ func hashes(ctx *cli.Context) { defer f.Close() fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags()) - refs, err := fileStore.GetAllReferences(context.TODO(), f, false) + refs, err := fileStore.GetAllReferences(context.TODO(), f) if err != nil { utils.Fatalf("%v\n", err) } else { diff --git a/cmd/swarm/feeds_test.go b/cmd/swarm/feeds_test.go index 59e805fd64..4e99c4708f 100644 --- a/cmd/swarm/feeds_test.go +++ b/cmd/swarm/feeds_test.go @@ -36,8 +36,8 @@ import ( func TestCLIFeedUpdate(t *testing.T) { srv := swarmhttp.NewTestSwarmServer(t, func(api *api.API) swarmhttp.TestServer { - return swarmhttp.NewServer(api, "") - }, nil) + return swarmhttp.NewServer(api, nil, "") + }, nil, nil) log.Info("starting a test swarm server") defer srv.Close() diff --git a/cmd/swarm/flags.go b/cmd/swarm/flags.go index 8edb630fd5..58ed9edf66 100644 --- a/cmd/swarm/flags.go +++ b/cmd/swarm/flags.go @@ -200,4 +200,8 @@ var ( Name: "legacy", Usage: "Use this flag when importing a db export from a legacy local store database dump (for schemas older than 'sanctuary')", } + SwarmPinFlag = cli.BoolFlag{ + Name: "pin", + Usage: "Use this flag to pin the file after upload is complete. This flag is used when uploading a file.", + } ) diff --git a/cmd/swarm/hash.go b/cmd/swarm/hash.go index 52062c8ad3..4dfadf771c 100644 --- a/cmd/swarm/hash.go +++ b/cmd/swarm/hash.go @@ -79,6 +79,7 @@ func hash(ctx *cli.Context) { stat, _ := f.Stat() fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags()) + addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false) if err != nil { utils.Fatalf("%v\n", err) diff --git a/cmd/swarm/manifest.go b/cmd/swarm/manifest.go index 23fcac3c76..01c1c70dca 100644 --- a/cmd/swarm/manifest.go +++ b/cmd/swarm/manifest.go @@ -38,26 +38,35 @@ var manifestCommand = cli.Command{ { Action: manifestAdd, CustomHelpTemplate: helpTemplate, - Name: "add", - Usage: "add a new path to the manifest", - ArgsUsage: " ", - Description: "Adds a new path to the manifest", + Flags: []cli.Flag{ + SwarmPinFlag, + }, + Name: "add", + Usage: "add a new path to the manifest", + ArgsUsage: " ", + Description: "Adds a new path to the manifest", }, { Action: manifestUpdate, CustomHelpTemplate: helpTemplate, - Name: "update", - Usage: "update the hash for an already existing path in the manifest", - ArgsUsage: " ", - Description: "Update the hash for an already existing path in the manifest", + Flags: []cli.Flag{ + SwarmPinFlag, + }, + Name: "update", + Usage: "update the hash for an already existing path in the manifest", + ArgsUsage: " ", + Description: "Update the hash for an already existing path in the manifest", }, { Action: manifestRemove, CustomHelpTemplate: helpTemplate, - Name: "remove", - Usage: "removes a path from the manifest", - ArgsUsage: " ", - Description: "Removes a path from the manifest", + Flags: []cli.Flag{ + SwarmPinFlag, + }, + Name: "remove", + Usage: "removes a path from the manifest", + ArgsUsage: " ", + Description: "Removes a path from the manifest", }, }, } @@ -67,6 +76,8 @@ var manifestCommand = cli.Command{ // with only one entry, which meta-data will be added to the original manifest. // On success, this function will print new (updated) manifest's hash. func manifestAdd(ctx *cli.Context) { + toPin := ctx.Bool(SwarmPinFlag.Name) + args := ctx.Args() if len(args) != 3 { utils.Fatalf("Need exactly three arguments ") @@ -92,7 +103,7 @@ func manifestAdd(ctx *cli.Context) { utils.Fatalf("Too many entries in manifest %s", hash) } - newManifest := addEntryToManifest(client, mhash, path, m.Entries[0]) + newManifest := addEntryToManifest(client, mhash, path, m.Entries[0], toPin) fmt.Println(newManifest) } @@ -101,6 +112,8 @@ func manifestAdd(ctx *cli.Context) { // with only one entry, which meta-data will be added to the original manifest. // On success, this function will print hash of the updated manifest. func manifestUpdate(ctx *cli.Context) { + toPin := ctx.Bool(SwarmPinFlag.Name) + args := ctx.Args() if len(args) != 3 { utils.Fatalf("Need exactly three arguments ") @@ -126,7 +139,7 @@ func manifestUpdate(ctx *cli.Context) { utils.Fatalf("Too many entries in manifest %s", hash) } - newManifest, _, defaultEntryUpdated := updateEntryInManifest(client, mhash, path, m.Entries[0], true) + newManifest, _, defaultEntryUpdated := updateEntryInManifest(client, mhash, path, m.Entries[0], true, toPin) if defaultEntryUpdated { // Print informational message to stderr // allowing the user to get the new manifest hash from stdout @@ -140,6 +153,8 @@ func manifestUpdate(ctx *cli.Context) { // On success, this function will print hash of the manifest which does not // contain the path. func manifestRemove(ctx *cli.Context) { + toPin := ctx.Bool(SwarmPinFlag.Name) + args := ctx.Args() if len(args) != 2 { utils.Fatalf("Need exactly two arguments ") @@ -153,11 +168,11 @@ func manifestRemove(ctx *cli.Context) { bzzapi := strings.TrimRight(ctx.GlobalString(SwarmApiFlag.Name), "/") client := swarm.NewClient(bzzapi) - newManifest := removeEntryFromManifest(client, mhash, path) + newManifest := removeEntryFromManifest(client, mhash, path, toPin) fmt.Println(newManifest) } -func addEntryToManifest(client *swarm.Client, mhash, path string, entry api.ManifestEntry) string { +func addEntryToManifest(client *swarm.Client, mhash, path string, entry api.ManifestEntry, toPin bool) string { var longestPathEntry = api.ManifestEntry{} mroot, isEncrypted, err := client.DownloadManifest(mhash) @@ -182,7 +197,7 @@ func addEntryToManifest(client *swarm.Client, mhash, path string, entry api.Mani if longestPathEntry.Path != "" { // Load the child Manifest add the entry there newPath := path[len(longestPathEntry.Path):] - newHash := addEntryToManifest(client, longestPathEntry.Hash, newPath, entry) + newHash := addEntryToManifest(client, longestPathEntry.Hash, newPath, entry, toPin) // Replace the hash for parent Manifests newMRoot := &api.Manifest{} @@ -199,7 +214,7 @@ func addEntryToManifest(client *swarm.Client, mhash, path string, entry api.Mani mroot.Entries = append(mroot.Entries, entry) } - newManifestHash, err := client.UploadManifest(mroot, isEncrypted) + newManifestHash, err := client.UploadManifest(mroot, isEncrypted, toPin) if err != nil { utils.Fatalf("Manifest upload failed: %v", err) } @@ -212,7 +227,7 @@ func addEntryToManifest(client *swarm.Client, mhash, path string, entry api.Mani // default entry in root manifest will be updated too. // Returned values are the new manifest hash, hash of the entry that was replaced by the new entry and // a a bool that is true if default entry is updated. -func updateEntryInManifest(client *swarm.Client, mhash, path string, entry api.ManifestEntry, isRoot bool) (newManifestHash, oldHash string, defaultEntryUpdated bool) { +func updateEntryInManifest(client *swarm.Client, mhash, path string, entry api.ManifestEntry, isRoot bool, toPin bool) (newManifestHash, oldHash string, defaultEntryUpdated bool) { var ( newEntry = api.ManifestEntry{} longestPathEntry = api.ManifestEntry{} @@ -248,7 +263,7 @@ func updateEntryInManifest(client *swarm.Client, mhash, path string, entry api.M // Load the child Manifest add the entry there newPath := path[len(longestPathEntry.Path):] var newHash string - newHash, oldHash, _ = updateEntryInManifest(client, longestPathEntry.Hash, newPath, entry, false) + newHash, oldHash, _ = updateEntryInManifest(client, longestPathEntry.Hash, newPath, entry, false, toPin) // Replace the hash for parent Manifests newMRoot := &api.Manifest{} @@ -282,14 +297,14 @@ func updateEntryInManifest(client *swarm.Client, mhash, path string, entry api.M mroot = newMRoot } - newManifestHash, err = client.UploadManifest(mroot, isEncrypted) + newManifestHash, err = client.UploadManifest(mroot, isEncrypted, toPin) if err != nil { utils.Fatalf("Manifest upload failed: %v", err) } return newManifestHash, oldHash, defaultEntryUpdated } -func removeEntryFromManifest(client *swarm.Client, mhash, path string) string { +func removeEntryFromManifest(client *swarm.Client, mhash, path string, toPin bool) string { var ( entryToRemove = api.ManifestEntry{} longestPathEntry = api.ManifestEntry{} @@ -321,7 +336,7 @@ func removeEntryFromManifest(client *swarm.Client, mhash, path string) string { if longestPathEntry.Path != "" { // Load the child Manifest remove the entry there newPath := path[len(longestPathEntry.Path):] - newHash := removeEntryFromManifest(client, longestPathEntry.Hash, newPath) + newHash := removeEntryFromManifest(client, longestPathEntry.Hash, newPath, toPin) // Replace the hash for parent Manifests newMRoot := &api.Manifest{} @@ -345,7 +360,7 @@ func removeEntryFromManifest(client *swarm.Client, mhash, path string) string { mroot = newMRoot } - newManifestHash, err := client.UploadManifest(mroot, isEncrypted) + newManifestHash, err := client.UploadManifest(mroot, isEncrypted, toPin) if err != nil { utils.Fatalf("Manifest upload failed: %v", err) } diff --git a/cmd/swarm/manifest_test.go b/cmd/swarm/manifest_test.go index 8b0327a824..7d7762bc48 100644 --- a/cmd/swarm/manifest_test.go +++ b/cmd/swarm/manifest_test.go @@ -58,7 +58,7 @@ func TestManifestChangeEncrypted(t *testing.T) { // Argument encrypt controls whether to use encryption or not. func testManifestChange(t *testing.T, encrypt bool) { t.Parallel() - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() tmp, err := ioutil.TempDir("", "swarm-manifest-test") @@ -430,7 +430,7 @@ func TestNestedDefaultEntryUpdateEncrypted(t *testing.T) { func testNestedDefaultEntryUpdate(t *testing.T, encrypt bool) { t.Parallel() - srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) + srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil, nil) defer srv.Close() tmp, err := ioutil.TempDir("", "swarm-manifest-test") diff --git a/cmd/swarm/run_test.go b/cmd/swarm/run_test.go index 7234842df6..cd68b7e747 100644 --- a/cmd/swarm/run_test.go +++ b/cmd/swarm/run_test.go @@ -60,7 +60,7 @@ func init() { const clusterSize = 3 func serverFunc(api *api.API) swarmhttp.TestServer { - return swarmhttp.NewServer(api, "") + return swarmhttp.NewServer(api, nil, "") } func TestMain(m *testing.M) { // check if we have been reexec'd diff --git a/cmd/swarm/upload.go b/cmd/swarm/upload.go index ab8e246d95..0eda12d09c 100644 --- a/cmd/swarm/upload.go +++ b/cmd/swarm/upload.go @@ -42,7 +42,7 @@ var upCommand = cli.Command{ Name: "up", Usage: "uploads a file or directory to swarm using the HTTP API", ArgsUsage: "", - Flags: []cli.Flag{SwarmEncryptedFlag}, + Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag}, Description: "uploads a file or directory to swarm using the HTTP API and prints the root hash", } @@ -57,6 +57,7 @@ func upload(ctx *cli.Context) { mimeType = ctx.GlobalString(SwarmUploadMimeType.Name) client = swarm.NewClient(bzzapi) toEncrypt = ctx.Bool(SwarmEncryptedFlag.Name) + toPin = ctx.Bool(SwarmPinFlag.Name) autoDefaultPath = false file string ) @@ -94,7 +95,7 @@ func upload(ctx *cli.Context) { utils.Fatalf("Error opening file: %s", err) } defer f.Close() - hash, err := client.UploadRaw(f, f.Size, toEncrypt) + hash, err := client.UploadRaw(f, f.Size, toEncrypt, toPin) if err != nil { utils.Fatalf("Upload failed: %s", err) } @@ -135,7 +136,7 @@ func upload(ctx *cli.Context) { defaultPath = strings.TrimPrefix(absDefaultPath, absFile) } } - return client.UploadDirectory(file, defaultPath, "", toEncrypt) + return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin) } } else { doUpload = func() (string, error) { @@ -147,7 +148,7 @@ func upload(ctx *cli.Context) { if mimeType != "" { f.ContentType = mimeType } - return client.Upload(f, "", toEncrypt) + return client.Upload(f, "", toEncrypt, toPin) } } hash, err := doUpload() diff --git a/network/retrieval/retrieve_test.go b/network/retrieval/retrieve_test.go index 31b6ee1352..d26ace9a8d 100644 --- a/network/retrieval/retrieve_test.go +++ b/network/retrieval/retrieve_test.go @@ -372,7 +372,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { defer cleanup() reader := bytes.NewReader(testData) - return fileStore.GetAllReferences(context.Background(), reader, false) + return fileStore.GetAllReferences(context.Background(), reader) } func getChunks(store chunk.Store) (chunks map[string]struct{}, err error) { diff --git a/network/stream/common_test.go b/network/stream/common_test.go index 739ff548fb..b08e7ef39c 100644 --- a/network/stream/common_test.go +++ b/network/stream/common_test.go @@ -403,5 +403,5 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { defer cleanup() reader := bytes.NewReader(testData) - return fileStore.GetAllReferences(context.Background(), reader, false) + return fileStore.GetAllReferences(context.Background(), reader) } diff --git a/network/stream/delivery.go b/network/stream/delivery.go index 50df0dabf0..dd80e3824a 100644 --- a/network/stream/delivery.go +++ b/network/stream/delivery.go @@ -30,7 +30,7 @@ import ( "github.com/ethersphere/swarm/network/timeouts" "github.com/ethersphere/swarm/spancontext" "github.com/ethersphere/swarm/storage" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" olog "github.com/opentracing/opentracing-go/log" ) @@ -162,6 +162,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int msg.peer = sp log.Trace("handle.chunk.delivery", "put", msg.Addr) + _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) if err != nil { if err == storage.ErrChunkInvalid { diff --git a/shed/example_store_test.go b/shed/example_store_test.go index 4c9631923f..3db4d75c13 100644 --- a/shed/example_store_test.go +++ b/shed/example_store_test.go @@ -290,7 +290,7 @@ func (s *Store) GetSchema() (name string, err error) { return name, err } -// GetSchema is an example of storing the most simple +// PutSchema is an example of storing the most simple // string in a database field. func (s *Store) PutSchema(name string) (err error) { return s.schemaName.Put(name) diff --git a/shed/index.go b/shed/index.go index 38afbce4ca..7b01247fbc 100644 --- a/shed/index.go +++ b/shed/index.go @@ -41,6 +41,7 @@ type Item struct { AccessTimestamp int64 StoreTimestamp int64 BinID uint64 + PinCounter uint64 // maintains the no of time a chunk is pinned } // Merge is a helper method to construct a new @@ -62,6 +63,9 @@ func (i Item) Merge(i2 Item) (new Item) { if i.BinID == 0 { i.BinID = i2.BinID } + if i.PinCounter == 0 { + i.PinCounter = i2.PinCounter + } return i } diff --git a/state/dbstore.go b/state/dbstore.go index 1b541e7854..6f5a73bf25 100644 --- a/state/dbstore.go +++ b/state/dbstore.go @@ -23,6 +23,7 @@ import ( "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/util" ) // ErrNotFound is returned when no results are returned from the database @@ -34,6 +35,7 @@ type Store interface { Get(key string, i interface{}) (err error) Put(key string, i interface{}) (err error) Delete(key string) (err error) + Iterate(prefix string, iterFunc func([]byte, []byte)) (err error) Close() error } @@ -105,6 +107,17 @@ func (s *DBStore) Delete(key string) (err error) { return s.db.Delete([]byte(key), nil) } +// Iterate entries which has a specific prefix +// The key and value may be modified in the iterFunc +func (s *DBStore) Iterate(prefix string, iterFunc func([]byte, []byte)) (err error) { + iter := s.db.NewIterator(util.BytesPrefix([]byte(prefix)), nil) + for iter.Next() { + iterFunc(iter.Key(), iter.Value()) + } + iter.Release() + return iter.Error() +} + // Close releases the resources used by the underlying LevelDB. func (s *DBStore) Close() error { return s.db.Close() diff --git a/storage/feed/handler.go b/storage/feed/handler.go index cbaaac1819..ed4c4251e7 100644 --- a/storage/feed/handler.go +++ b/storage/feed/handler.go @@ -26,11 +26,9 @@ import ( "sync/atomic" "github.com/ethersphere/swarm/chunk" - - "github.com/ethersphere/swarm/storage/feed/lookup" - "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/feed/lookup" ) type Handler struct { diff --git a/storage/filestore.go b/storage/filestore.go index ff5ffe3970..ec8e9f73af 100644 --- a/storage/filestore.go +++ b/storage/filestore.go @@ -91,6 +91,7 @@ func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChu if err != nil { tag = chunk.NewTag(0, "ephemeral-retrieval-tag", 0) } + getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted, tag) reader = TreeJoin(ctx, addr, getter, 0) return @@ -117,12 +118,12 @@ func (f *FileStore) HashSize() int { } // GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file -func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) { +func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader) (addrs AddressCollection, err error) { tag := chunk.NewTag(0, "ephemeral-tag", 0) //this tag is just a mock ephemeral tag since we don't want to save these results // create a special kind of putter, which only will store the references putter := &hashExplorer{ - hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt, tag), + hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, false, tag), } // do the actual splitting anyway, no way around it _, wait, err := PyramidSplit(ctx, data, putter, putter, tag) @@ -157,6 +158,7 @@ func (he *hashExplorer) Put(ctx context.Context, chunkData ChunkData) (Reference if err != nil { return nil, err } + // internally store the reference he.lock.Lock() he.references = append(he.references, ref) diff --git a/storage/filestore_test.go b/storage/filestore_test.go index a6fc3e5a20..92f0ffcfc9 100644 --- a/storage/filestore_test.go +++ b/storage/filestore_test.go @@ -191,7 +191,7 @@ func TestGetAllReferences(t *testing.T) { for i, r := range testRuns { slice := testutil.RandomBytes(1, r) - addrs, err := fileStore.GetAllReferences(context.Background(), bytes.NewReader(slice), false) + addrs, err := fileStore.GetAllReferences(context.Background(), bytes.NewReader(slice)) if err != nil { t.Fatal(err) } diff --git a/storage/localstore/export.go b/storage/localstore/export.go index bc27c3a301..b8826d976e 100644 --- a/storage/localstore/export.go +++ b/storage/localstore/export.go @@ -59,11 +59,13 @@ func (db *DB) Export(w io.Writer) (count int64, err error) { } err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { + hdr := &tar.Header{ Name: hex.EncodeToString(item.Address), Mode: 0644, Size: int64(len(item.Data)), } + if err := tw.WriteHeader(hdr); err != nil { return false, err } diff --git a/storage/localstore/gc.go b/storage/localstore/gc.go index 022d524bd1..105fc94983 100644 --- a/storage/localstore/gc.go +++ b/storage/localstore/gc.go @@ -62,7 +62,7 @@ func (db *DB) collectGarbageWorker() { db.triggerGarbageCollection() } - if collectedCount > 0 && testHookCollectGarbage != nil { + if testHookCollectGarbage != nil { testHookCollectGarbage(collectedCount) } case <-db.close: @@ -94,6 +94,14 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { db.batchMu.Lock() defer db.batchMu.Unlock() + // run through the recently pinned chunks and + // remove them from the gcIndex before iterating through gcIndex + err = db.removeChunksInExcludeIndexFromGC() + if err != nil { + log.Error("localstore exclude pinned chunks", "err", err) + return 0, true, err + } + gcSize, err := db.gcSize.Get() if err != nil { return 0, true, err @@ -138,6 +146,68 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { return collectedCount, done, nil } +// removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex. +func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { + metricName := "localstore.gc.exclude" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + defer func() { + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + }() + + batch := new(leveldb.Batch) + excludedCount := 0 + var gcSizeChange int64 + err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) { + // Get access timestamp + retrievalAccessIndexItem, err := db.retrievalAccessIndex.Get(item) + if err != nil { + return false, err + } + item.AccessTimestamp = retrievalAccessIndexItem.AccessTimestamp + + // Get the binId + retrievalDataIndexItem, err := db.retrievalDataIndex.Get(item) + if err != nil { + return false, err + } + item.BinID = retrievalDataIndexItem.BinID + + // Check if this item is in gcIndex and remove it + ok, err := db.gcIndex.Has(item) + if ok { + db.gcIndex.DeleteInBatch(batch, item) + if _, err := db.gcIndex.Get(item); err == nil { + gcSizeChange-- + } + excludedCount++ + db.gcExcludeIndex.DeleteInBatch(batch, item) + } + + return false, nil + }, nil) + if err != nil { + return err + } + + // update the gc size based on the no of entries deleted in gcIndex + err = db.incGCSizeInBatch(batch, gcSizeChange) + if err != nil { + return err + } + + metrics.GetOrRegisterCounter(metricName+".excluded-count", nil).Inc(int64(excludedCount)) + err = db.shed.WriteBatch(batch) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".writebatch.err", nil).Inc(1) + return err + } + + return nil +} + // gcTrigger retruns the absolute value for garbage collection // target value, calculated from db.capacity and gcTargetRatio. func (db *DB) gcTarget() (target uint64) { diff --git a/storage/localstore/gc_test.go b/storage/localstore/gc_test.go index c225088940..7dd4ee8fd2 100644 --- a/storage/localstore/gc_test.go +++ b/storage/localstore/gc_test.go @@ -17,6 +17,7 @@ package localstore import ( + "bytes" "context" "io/ioutil" "math/rand" @@ -25,6 +26,7 @@ import ( "time" "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/shed" ) // TestDB_collectGarbageWorker tests garbage collection runs @@ -81,6 +83,7 @@ func testDB_collectGarbageWorker(t *testing.T) { } addrs = append(addrs, ch.Address()) + } gcTarget := db.gcTarget() @@ -114,6 +117,15 @@ func testDB_collectGarbageWorker(t *testing.T) { } }) + t.Run("only first inserted chunks should be removed", func(t *testing.T) { + for i := 0; i < (chunkCount - int(gcTarget)); i++ { + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[i]) + if err != chunk.ErrChunkNotFound { + t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound) + } + } + }) + // last synced chunk should not be removed t.Run("get most recent synced chunk", func(t *testing.T) { _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1]) @@ -123,6 +135,163 @@ func testDB_collectGarbageWorker(t *testing.T) { }) } +// Pin a file, upload chunks to go past the gc limit to trigger GC, +// check if the pinned files are still around and removed from gcIndex +func TestPinGC(t *testing.T) { + + chunkCount := 150 + pinChunksCount := 50 + dbCapacity := uint64(100) + + db, cleanupFunc := newTestDB(t, &Options{ + Capacity: dbCapacity, + }) + testHookCollectGarbageChan := make(chan uint64) + defer setTestHookCollectGarbage(func(collectedCount uint64) { + select { + case testHookCollectGarbageChan <- collectedCount: + case <-db.close: + } + })() + defer cleanupFunc() + + addrs := make([]chunk.Address, 0) + pinAddrs := make([]chunk.Address, 0) + + // upload random chunks + for i := 0; i < chunkCount; i++ { + ch := generateTestRandomChunk() + + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) + if err != nil { + t.Fatal(err) + } + + addrs = append(addrs, ch.Address()) + + // Pin the chunks at the beginning to make sure they are not removed by GC + if i < pinChunksCount { + err = db.Set(context.Background(), chunk.ModeSetPin, ch.Address()) + if err != nil { + t.Fatal(err) + } + pinAddrs = append(pinAddrs, ch.Address()) + } + } + gcTarget := db.gcTarget() + + for { + select { + case <-testHookCollectGarbageChan: + case <-time.After(10 * time.Second): + t.Error("collect garbage timeout") + } + gcSize, err := db.gcSize.Get() + if err != nil { + t.Fatal(err) + } + if gcSize == gcTarget { + break + } + } + + t.Run("pin Index count", newItemsCountTest(db.pinIndex, int(pinChunksCount))) + + t.Run("gc exclude index count", newItemsCountTest(db.gcExcludeIndex, int(0))) + + t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)+pinChunksCount)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget))) + + t.Run("gc size", newIndexGCSizeTest(db)) + + t.Run("pinned chunk not in gc Index", func(t *testing.T) { + err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { + for _, pinHash := range pinAddrs { + if bytes.Equal(pinHash, item.Address) { + t.Fatal("pin chunk present in gcIndex") + } + } + return false, nil + }, nil) + if err != nil { + t.Fatal("could not iterate gcIndex") + } + }) + + t.Run("pinned chunks exists", func(t *testing.T) { + for _, hash := range pinAddrs { + _, err := db.Get(context.Background(), chunk.ModeGetRequest, hash) + if err != nil { + t.Fatal(err) + } + } + }) + + t.Run("first chunks after pinned chunks should be removed", func(t *testing.T) { + for i := pinChunksCount; i < (int(dbCapacity) - int(gcTarget)); i++ { + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[i]) + if err != chunk.ErrChunkNotFound { + t.Fatal(err) + } + } + }) +} + +// Upload chunks, pin those chunks, add to GC after it is pinned +// check if the pinned files are still around +func TestGCAfterPin(t *testing.T) { + + chunkCount := 50 + + db, cleanupFunc := newTestDB(t, &Options{ + Capacity: 100, + }) + defer cleanupFunc() + + pinAddrs := make([]chunk.Address, 0) + + // upload random chunks + for i := 0; i < chunkCount; i++ { + ch := generateTestRandomChunk() + + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + + // Pin before adding to GC in ModeSetSync + err = db.Set(context.Background(), chunk.ModeSetPin, ch.Address()) + if err != nil { + t.Fatal(err) + } + pinAddrs = append(pinAddrs, ch.Address()) + + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + + t.Run("pin Index count", newItemsCountTest(db.pinIndex, int(chunkCount))) + + t.Run("gc exclude index count", newItemsCountTest(db.gcExcludeIndex, int(chunkCount))) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, int(0))) + + for _, hash := range pinAddrs { + _, err := db.Get(context.Background(), chunk.ModeGetRequest, hash) + if err != nil { + t.Fatal(err) + } + } +} + // TestDB_collectGarbageWorker_withRequests is a helper test function // to test garbage collection runs by uploading, syncing and // requesting a number of chunks. diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index 265cd045ed..5090a9d8a1 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -80,6 +80,12 @@ type DB struct { // garbage collection index gcIndex shed.Index + // garbage collection exclude index for pinned contents + gcExcludeIndex shed.Index + + // pin files Index + pinIndex shed.Index + // field that stores number of intems in gc index gcSize shed.Uint64Field @@ -336,6 +342,50 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if err != nil { return nil, err } + + // Create a index structure for storing pinned chunks and their pin counts + db.pinIndex, err = db.shed.NewIndex("Hash->PinCounter", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b[:8], fields.PinCounter) + return b, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.PinCounter = binary.BigEndian.Uint64(value[:8]) + return e, nil + }, + }) + if err != nil { + return nil, err + } + + // Create a index structure for excluding pinned chunks from gcIndex + db.gcExcludeIndex, err = db.shed.NewIndex("Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + // start garbage collection worker go db.collectGarbageWorker() return db, nil diff --git a/storage/localstore/mode_get.go b/storage/localstore/mode_get.go index d4b17936c5..1ded4ec3b2 100644 --- a/storage/localstore/mode_get.go +++ b/storage/localstore/mode_get.go @@ -52,7 +52,7 @@ func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) ( } return nil, err } - return chunk.NewChunk(out.Address, out.Data), nil + return chunk.NewChunk(out.Address, out.Data).WithPinCounter(out.PinCounter), nil } // get returns Item from the retrieval index @@ -96,6 +96,13 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er } }() + case chunk.ModeGetPin: + pinnedItem, err := db.pinIndex.Get(item) + if err != nil { + return out, err + } + return pinnedItem, nil + // no updates to indexes case chunk.ModeGetSync: case chunk.ModeGetLookup: diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 2e812f01f5..84c05476ce 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -138,9 +138,16 @@ func (db *DB) set(mode chunk.ModeSet, addr chunk.Address) (err error) { item.AccessTimestamp = now() db.retrievalAccessIndex.PutInBatch(batch, item) db.pushIndex.DeleteInBatch(batch, item) - db.gcIndex.PutInBatch(batch, item) - gcSizeChange++ + // Add in gcIndex only if this chunk is not pinned + ok, err := db.pinIndex.Has(item) + if err != nil { + return err + } + if !ok { + db.gcIndex.PutInBatch(batch, item) + gcSizeChange++ + } case chunk.ModeSetRemove: // delete from retrieve, pull, gc @@ -174,6 +181,42 @@ func (db *DB) set(mode chunk.ModeSet, addr chunk.Address) (err error) { gcSizeChange = -1 } + case chunk.ModeSetPin: + // Get the existing pin counter of the chunk + existingPinCounter := uint64(0) + pinnedChunk, err := db.pinIndex.Get(item) + if err != nil { + if err == leveldb.ErrNotFound { + // If this Address is not present in DB, then its a new entry + existingPinCounter = 0 + + // Add in gcExcludeIndex of the chunk is not pinned already + db.gcExcludeIndex.PutInBatch(batch, item) + } else { + return err + } + } else { + existingPinCounter = pinnedChunk.PinCounter + } + + // Otherwise increase the existing counter by 1 + item.PinCounter = existingPinCounter + 1 + db.pinIndex.PutInBatch(batch, item) + case chunk.ModeSetUnpin: + // Get the existing pin counter of the chunk + pinnedChunk, err := db.pinIndex.Get(item) + if err != nil { + return err + } + + // Decrement the pin counter or + // delete it from pin index if the pin counter has reached 0 + if pinnedChunk.PinCounter > 1 { + item.PinCounter = pinnedChunk.PinCounter - 1 + db.pinIndex.PutInBatch(batch, item) + } else { + db.pinIndex.DeleteInBatch(batch, item) + } default: return ErrInvalidMode } diff --git a/storage/pin/pin.go b/storage/pin/pin.go new file mode 100644 index 0000000000..917c880fbe --- /dev/null +++ b/storage/pin/pin.go @@ -0,0 +1,480 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package pin + +import ( + "context" + "encoding/binary" + "encoding/hex" + "errors" + "sync" + + "github.com/ethersphere/swarm/api" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/localstore" +) + +const ( + Version = "1.0" + WorkerChanSize = 8 // Max no of goroutines when walking the file tree +) + +var ( + errInvalidChunkData = errors.New("invalid chunk data") + errInvalidUnmarshallData = errors.New("invalid data length") +) + +// FileInfo is the struct that stores the information about pinned files +// A map of this is stored in the state DB +type FileInfo struct { + isRaw bool + fileSize uint64 + pinCounter uint64 +} + +// MarshalBinary encodes the FileInfo object in to a binary form for storage +func (f *FileInfo) MarshalBinary() (data []byte, err error) { + data = make([]byte, 17) + if f.isRaw { + data[0] = 1 + } else { + data[0] = 0 + } + binary.BigEndian.PutUint64(data[1:], f.fileSize) + binary.BigEndian.PutUint64(data[9:], f.pinCounter) + return data, nil +} + +// UnmarshalBinary decodes the binary form from the state store to the FileInfo object +func (f *FileInfo) UnmarshalBinary(data []byte) error { + if len(data) != 17 { + return errInvalidUnmarshallData + } + if data[0] == 1 { + f.isRaw = true + } else { + f.isRaw = false + } + f.fileSize = binary.BigEndian.Uint64(data[1:]) + f.pinCounter = binary.BigEndian.Uint64(data[9:]) + return nil +} + +// API is the main object which implements all things pinning. +type API struct { + db *localstore.DB + api *api.API + fileParams *storage.FileStoreParams + tag *chunk.Tags + hashSize int + state state.Store // the state store used to store info about pinned files +} + +// NewAPI creates a API object that is required for pinning and unpinning +func NewAPI(lstore *localstore.DB, stateStore state.Store, params *storage.FileStoreParams, tags *chunk.Tags, api *api.API) *API { + hashFunc := storage.MakeHashFunc(storage.DefaultHash) + return &API{ + db: lstore, + api: api, + fileParams: params, + tag: tags, + hashSize: hashFunc().Size(), + state: stateStore, + } +} + +// PinFiles is used to pin a RAW file or a collection (which hash manifest's) +// to the local Swarm node. It takes the root hash as the argument and walks +// down the merkle tree and pin all the chunks that are encountered on the +// way. It pins both data chunk and tree chunks. The pre-requisite is that +// the file should be present in the local database. This function is called +// from two places 1) Just after the file is uploaded 2) anytime after +// uploading the file using the pin command. This function can pin both +// encrypted and non-encrypted files. +func (p *API) PinFiles(addr []byte, isRaw bool, credentials string) error { + hasChunk, err := p.db.Has(context.Background(), chunk.Address(p.removeDecryptionKeyFromChunkHash(addr))) + if !hasChunk { + log.Error("Could not pin hash. File not uploaded", "rootHash", hex.EncodeToString(addr)) + return err + } + + // Walk the root hash and pin all the chunks + walkerFunction := func(ref storage.Reference) error { + chunkAddr := p.removeDecryptionKeyFromChunkHash(ref) + err := p.db.Set(context.Background(), chunk.ModeSetPin, chunkAddr) + if err != nil { + log.Error("Could not pin chunk. Address "+"Address", hex.EncodeToString(chunkAddr)) + return err + } else { + log.Trace("Pinning chunk", "Address", hex.EncodeToString(chunkAddr)) + } + return nil + } + err = p.walkChunksFromRootHash(addr, isRaw, credentials, walkerFunction) + if err != nil { + log.Error("Error walking root hash.", "Hash", hex.EncodeToString(addr), "err", err) + return nil + } + + // Check if the root hash is already pinned and add it to the fileInfo struct + fileInfo, err := p.getPinnedFile(addr) + if err != nil { + // Get the file size from the root chunk first 8 bytes + hashFunc := storage.MakeHashFunc(storage.DefaultHash) + isEncrypted := len(addr) > hashFunc().Size() + getter := storage.NewHasherStore(p.db, hashFunc, isEncrypted, chunk.NewTag(0, "show-chunks-tag", 0)) + chunkData, err := getter.Get(context.Background(), addr) + if err != nil { + log.Error("Error getting chunk data from localstore.", "Address", hex.EncodeToString(addr)) + return nil + } + fileSize := chunkData.Size() + + // Get the pin counter from the pinIndex + pinCounter, err := p.getPinCounterOfChunk(chunk.Address(p.removeDecryptionKeyFromChunkHash(addr))) + if err != nil { + log.Error("Error getting pin counter of root hash.", "rootHash", hex.EncodeToString(addr), "err", err) + return nil + } + + fileInfo = FileInfo{ + isRaw: isRaw, + fileSize: fileSize, + pinCounter: pinCounter, + } + } else { + // Get the pin counter from the pinIndex + pinCounter, err := p.getPinCounterOfChunk(chunk.Address(p.removeDecryptionKeyFromChunkHash(addr))) + if err != nil { + log.Error("Error getting pin counter of root hash.", "rootHash", hex.EncodeToString(addr), "err", err) + return nil + } + fileInfo.pinCounter = pinCounter + } + + // Store the pinned files in state DB + err = p.savePinnedFile(addr, fileInfo) + if err != nil { + log.Error("Error saving pinned file info to state store.", "rootHash", hex.EncodeToString(addr), "err", err) + return nil + } + + log.Debug("File pinned", "Address", hex.EncodeToString(addr)) + return nil +} + +// UnPinFiles is used to unpin an already pinned file. It takes the root +// hash of the file and walks down the merkle tree unpinning all the chunks +// that are encountered on the way. The pre-requisite is that the file should +// have been already pinned using the PinFiles function. This function can +// be called only from an external command. +func (p *API) UnpinFiles(addr []byte, credentials string) error { + fileInfo, err := p.getPinnedFile(addr) + if err != nil { + log.Error("Root hash is not pinned", "rootHash", hex.EncodeToString(addr), "err", err) + return err + } + + // Walk the root hash and unpin all the chunks + walkerFunction := func(ref storage.Reference) error { + chunkAddr := p.removeDecryptionKeyFromChunkHash(ref) + err := p.db.Set(context.Background(), chunk.ModeSetUnpin, chunkAddr) + if err != nil { + log.Error("Could not unpin chunk", "Address", hex.EncodeToString(chunkAddr)) + return err + } else { + log.Trace("Unpinning chunk", "Address", hex.EncodeToString(chunkAddr)) + } + return nil + } + err = p.walkChunksFromRootHash(addr, fileInfo.isRaw, credentials, walkerFunction) + if err != nil { + log.Error("Error walking root hash.", "Hash", hex.EncodeToString(addr), "err", err) + return nil + } + + // Delete or Update the state DB + pinCounter, err := p.getPinCounterOfChunk(chunk.Address(p.removeDecryptionKeyFromChunkHash(addr))) + if err != nil { + err := p.removePinnedFile(addr) + if err != nil { + log.Error("Error unpinning file.", "rootHash", hex.EncodeToString(addr), "err", err) + return nil + } + } else { + fileInfo.pinCounter = pinCounter + err = p.savePinnedFile(addr, fileInfo) + if err != nil { + log.Error("Error updating file info to state store.", "rootHash", hex.EncodeToString(addr), "err", err) + return nil + } + } + + log.Debug("File unpinned", "Address", hex.EncodeToString(addr)) + return nil +} + +// ListPinFiles functions logs information of all the files that are pinned +// in the current local node. It displays the root hash of the pinned file +// or collection. It also display three vital information's +// 1) Whether the file is a RAW file or not +// 2) Size of the pinned file or collection +// 3) the number of times that particular file or collection is pinned. +func (p *API) ListPinFiles() (map[string]FileInfo, error) { + pinnedFiles := make(map[string]FileInfo) + iterFunc := func(key []byte, value []byte) { + hash := string(key[4:]) + fileInfo := FileInfo{} + err := fileInfo.UnmarshalBinary(value) + if err != nil { + log.Debug("Error unmarshaling fileinfo from state store", "Address", hash) + } + log.Trace("Pinned file", "Address", hash, "IsRAW", fileInfo.isRaw, + "fileSize", fileInfo.fileSize, "pinCounter", fileInfo.pinCounter) + pinnedFiles[hash] = fileInfo + } + err := p.state.Iterate("pin_", iterFunc) + if err != nil { + log.Error("Error iterating pinned files", "err", err) + return nil, err + } + return pinnedFiles, nil +} + +func (p *API) walkChunksFromRootHash(addr []byte, isRaw bool, credentials string, + executeFunc func(storage.Reference) error) error { + + fileHashesC := make(chan storage.Reference, WorkerChanSize) + fileErrC := make(chan error) + var fwg sync.WaitGroup // wait group for file walker reoutine to complete + + fwg.Add(1) + go func() { + defer fwg.Done() + if !isRaw { + // If it is not a raw file... load the manifest and add the files inside one by one + walker, err := p.api.NewManifestWalker(context.Background(), storage.Address(addr), + p.api.Decryptor(context.Background(), credentials), nil) + if err != nil { + log.Error("Could not decode manifest.", "err", err) + fileErrC <- err + return + } + + err = walker.Walk(func(entry *api.ManifestEntry) error { + fileAddr, err := hex.DecodeString(entry.Hash) + if err != nil { + log.Error("Error decoding hash present in manifest", "err", err) + return err + } + + // send the file to file workers + fileHashesC <- storage.Reference(fileAddr) + return nil + }) + if err != nil { + log.Error("Error walking manifest", "err", err) + fileErrC <- err + return + } + + // Finally, add the root manifest hash too + fileHashesC <- storage.Reference(addr) + + // Signal end of file hash stream + close(fileHashesC) + } else { + // Its a raw file.. no manifest.. so process only this hash + fileHashesC <- storage.Reference(addr) + + // Signal end of file hash + close(fileHashesC) + } + }() + + fwg.Add(1) + go func() { + defer fwg.Done() + for { + select { + case fileRef, ok := <-fileHashesC: + if !ok { + return + } + // Walk the file and its chunks + err := p.walkFile(fileRef, executeFunc, addr) + if err != nil { + fileErrC <- err + return + } + + // got error from manifest walker goroutine, so quit file walker too + case <-fileErrC: + return + } + } + }() + + go func() { + // Wait for all the chunks to finish execution + fwg.Wait() + + // close internal error channel after the file routine is done + close(fileErrC) + }() + + return <-fileErrC +} + +func (p *API) walkFile(fileRef storage.Reference, executeFunc func(storage.Reference) error, addr []byte) error { + chunkHashesC := make(chan storage.Reference, WorkerChanSize) + chunkErrC := make(chan error) + var cwg sync.WaitGroup // Wait group to wait for chunk routines to complete + actualFileSize := uint64(0) + rcvdFileSize := uint64(0) + var fileSizeLock sync.Mutex // lock to protect the fileSize variables + doneChunkWorker := make(chan struct{}) + + hashFunc := storage.MakeHashFunc(storage.DefaultHash) + hashSize := len(addr) + isEncrypted := len(addr) > hashFunc().Size() + getter := storage.NewHasherStore(p.db, hashFunc, isEncrypted, chunk.NewTag(0, "show-chunks-tag", 0)) + + // Trigger unwrapping the merkle tree starting from root hash of the file + chunkHashesC <- fileRef + +QuitChunkFor: + for { + select { + case <-doneChunkWorker: + break QuitChunkFor + case ref := <-chunkHashesC: + cwg.Add(1) + go func() { + defer cwg.Done() + chunkData, err := getter.Get(context.Background(), ref) + if err != nil { + log.Error("Error getting chunk data from localstore.", + "Address", hex.EncodeToString(ref), "err", err) + chunkErrC <- err + close(doneChunkWorker) + return + } + + datalen := len(chunkData) + if datalen < 9 { // Atleast 1 data byte. first 8 bytes are address + log.Error("Invalid chunk data from localstore.", + "Address", hex.EncodeToString(ref), "err", err) + chunkErrC <- errInvalidChunkData + close(doneChunkWorker) + return + } + + subTreeSize := chunkData.Size() + fileSizeLock.Lock() + if actualFileSize < subTreeSize { + actualFileSize = subTreeSize + } + fileSizeLock.Unlock() + + if subTreeSize > chunk.DefaultSize { + // this is a tree chunk + // load the tree's branches + branches := (datalen - 8) / hashSize + for i := 0; i < branches; i++ { + brAddr := make([]byte, hashSize) + start := (i * hashSize) + 8 + end := ((i + 1) * hashSize) + 8 + copy(brAddr[:], chunkData[start:end]) + chunkHashesC <- storage.Reference(brAddr) + } + } else { + // this is a data chunk + fileSizeLock.Lock() + rcvdFileSize = rcvdFileSize + chunk.DefaultSize + got := rcvdFileSize + need := actualFileSize + fileSizeLock.Unlock() + if got >= need { + close(doneChunkWorker) + } + } + + // process the chunk (pin / unpin / display) + err = executeFunc(ref) + if err != nil { + // TODO: if this happens, we should go back and revert the entire file's chunks + log.Error("Error executing walker function", + "Address", hex.EncodeToString(ref), "err", err) + chunkErrC <- err + close(doneChunkWorker) + } + }() + } + } + + func() { + // Wait for all the chunks to finish execution + cwg.Wait() + + // close internal error channel after all routines are done + close(chunkErrC) + }() + + return <-chunkErrC +} + +func (p *API) removeDecryptionKeyFromChunkHash(ref []byte) []byte { + // remove the decryption key from the encrypted file hash + isEncrypted := len(ref) > p.hashSize + if isEncrypted { + chunkAddr := make([]byte, p.hashSize) + copy(chunkAddr, ref[0:p.hashSize]) + return chunkAddr + } + return ref +} + +func (p *API) getPinCounterOfChunk(addr chunk.Address) (uint64, error) { + pinnedChunk, err := p.db.Get(context.Background(), chunk.ModeGetPin, p.removeDecryptionKeyFromChunkHash(addr)) + if err != nil { + return 0, err + } + return pinnedChunk.PinCounter(), nil +} + +func (p *API) savePinnedFile(addr []byte, fileInfo FileInfo) error { + key := "pin_" + hex.EncodeToString(addr) + err := p.state.Put(key, &fileInfo) + return err +} + +func (p *API) removePinnedFile(addr []byte) error { + key := "pin_" + hex.EncodeToString(addr) + err := p.state.Delete(key) + return err +} + +func (p *API) getPinnedFile(addr []byte) (FileInfo, error) { + key := "pin_" + hex.EncodeToString(addr) + fileInfo := FileInfo{} + err := p.state.Get(key, &fileInfo) + return fileInfo, err +} diff --git a/storage/pin/pin_test.go b/storage/pin/pin_test.go new file mode 100644 index 0000000000..0d4711f975 --- /dev/null +++ b/storage/pin/pin_test.go @@ -0,0 +1,543 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package pin + +import ( + "bytes" + "context" + "encoding/hex" + "io/ioutil" + "mime" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/ethersphere/swarm/api" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/feed" + "github.com/ethersphere/swarm/storage/localstore" + "github.com/ethersphere/swarm/testutil" +) + +// TestPinRawUpload pins a RAW file and unpin it multiple times +func TestPinRawUpload(t *testing.T) { + p, f, closeFunc := getPinApiAndFileStore(t) + defer closeFunc() + + data := testutil.RandomBytes(1, 10000) + hash := uploadFile(t, f, data, false) + + // test pin and unpin + pinUnpinAndFailIfError(t, p, hash, 3, true) +} + +// TestPinRawUploadEncrypted pins a encrypted RAW file and unpin it multiple times +func TestPinRawUploadEncrypted(t *testing.T) { + p, f, closeFunc := getPinApiAndFileStore(t) + defer closeFunc() + + data := testutil.RandomBytes(2, 10000) + hash := uploadFile(t, f, data, true) + + // test pin and unpin + pinUnpinAndFailIfError(t, p, hash, 3, true) +} + +// TestPinCollectionUpload pins a simple collection and unpin it multiple times +func TestPinCollectionUpload(t *testing.T) { + p, f, closeFunc := getPinApiAndFileStore(t) + defer closeFunc() + + hash := uploadCollection(t, p, f, false) + + // test pin and unpin + pinUnpinAndFailIfError(t, p, hash, 3, false) +} + +// TestPinCollectionUploadEncrypted pins a encrypted simple collection and unpin it multiple times +func TestPinCollectionUploadEncrypted(t *testing.T) { + p, f, closeFunc := getPinApiAndFileStore(t) + defer closeFunc() + + hash := uploadCollection(t, p, f, true) + + // test pin and unpin + pinUnpinAndFailIfError(t, p, hash, 3, false) +} + +// TestWalker tests the walkChunksFromRootHash function which is the crux of +// commands like pin, unpin & list. +func TestWalker(t *testing.T) { + sizes := []int{1, 4095, 4096, 4097, 123456} + for i := range sizes { + p, f, closeFunc := getPinApiAndFileStore(t) + defer closeFunc() + + data := testutil.RandomBytes(1, sizes[i]) + hash := uploadFile(t, f, data, false) + + addrs, err := f.GetAllReferences(context.TODO(), bytes.NewReader(data)) + if err != nil { + t.Fatalf("Error getting original chunks of the file") + } + + // Function to collect the walked chunks + walkedChunks := make(map[string]uint64) + var lock = sync.RWMutex{} + walkerFunction := func(ref storage.Reference) error { + chunkAddr := p.removeDecryptionKeyFromChunkHash(ref) + lock.Lock() + defer lock.Unlock() + walkedChunks[hex.EncodeToString(chunkAddr)] = 0 + return nil + } + err = p.walkChunksFromRootHash(hash, true, "", walkerFunction) + if err != nil { + t.Fatalf("Walker error for hash %s", hash) + } + + // Check if the number of chunks and chunk addresses match + if len(addrs) != len(walkedChunks) { + t.Fatalf("Expected number of chunks to be %d, but is %d", len(addrs), len(walkedChunks)) + } + for _, hash := range addrs { + if _, ok := walkedChunks[hex.EncodeToString(hash)]; !ok { + t.Fatalf("Expected chunk %s not present", hash) + } + } + } +} + +// TestListPinInfo tests the ListPinFiles command by pinning and unpinning a collection +// twice and check if this gets reflected properly in the data structure +func TestListPinInfo(t *testing.T) { + p, f, closeFunc := getPinApiAndFileStore(t) + defer closeFunc() + + hash := uploadCollection(t, p, f, false) + + // Pin the hash for the first time + err := p.PinFiles(hash, false, "") + if err != nil { + t.Fatalf("Could not pin " + err.Error()) + } + + // Get the list of pinned files by calling the ListPinFiles command + pinInfo, err := p.ListPinFiles() + if err != nil { + t.Fatalf("Error executing ListPinFiles command") + } + + // Check if the uploaded collection is in the list files data structure + fileInfo, ok := pinInfo[hex.EncodeToString(hash)] + if !ok { + t.Fatalf("uploaded collection not pinned") + } + if fileInfo.pinCounter != 1 { + t.Fatalf("pincounter expected is 1 got is %d", fileInfo.pinCounter) + } + if fileInfo.isRaw { + t.Fatalf("isRaw expected is false got is true") + } + + // Pin it once more and check if the counters increases + err = p.PinFiles(hash, false, "") + if err != nil { + t.Fatalf("Could not pin " + err.Error()) + } + + // Get the list of pinned files by calling the ListPinFiles command + pinInfo, err = p.ListPinFiles() + if err != nil { + t.Fatalf("Error executing ListPinFiles command") + } + fileInfo, ok = pinInfo[hex.EncodeToString(hash)] + if !ok { + t.Fatalf("hash not pinned ") + } + if fileInfo.pinCounter != 2 { + t.Fatalf("pincounter expected is 2 got is %d", fileInfo.pinCounter) + } + + // Unpin it and check if the counter decrements + err = p.UnpinFiles(hash, "") + if err != nil { + t.Fatalf("Could not unpin " + err.Error()) + } + + // Get the list of pinned files by calling the ListPinFiles command + pinInfo, err = p.ListPinFiles() + if err != nil { + t.Fatalf("Error executing ListPinFiles command") + } + fileInfo, ok = pinInfo[hex.EncodeToString(hash)] + if !ok { + t.Fatalf("collection totally unpinned") + } + if fileInfo.pinCounter != 1 { + t.Fatalf("pincounter expected is 1 got is %d", fileInfo.pinCounter) + } + + // Unpin it final time and the entry should not be there + err = p.UnpinFiles(hash, "") + if err != nil { + t.Fatalf("Could not unpin " + err.Error()) + } + + // Get the list of pinned files by calling the ListPinFiles command + pinInfo, err = p.ListPinFiles() + if err != nil { + t.Fatalf("Error executing ListPinFiles command") + } + _, ok = pinInfo[hex.EncodeToString(hash)] + if ok { + t.Fatalf("uploaded collection is still pinned") + } +} + +func getPinApiAndFileStore(t *testing.T) (*API, *storage.FileStore, func()) { + t.Helper() + + swarmDir, err := ioutil.TempDir("", "swarm-storage-test") + if err != nil { + t.Fatalf("could not create temp dir. Error: %s", err.Error()) + } + + stateStore, err := state.NewDBStore(filepath.Join(swarmDir, "state-store.db")) + if err != nil { + t.Fatalf("could not create state store. Error: %s", err.Error()) + } + + lStore, err := localstore.New(swarmDir, make([]byte, 32), nil) + if err != nil { + t.Fatalf("could not create localstore. Error: %s", err.Error()) + } + tags := chunk.NewTags() + fileStore := storage.NewFileStore(lStore, storage.NewFileStoreParams(), tags) + + // Swarm feeds test setup + feedsDir, err := ioutil.TempDir("", "swarm-feeds-test") + if err != nil { + t.Fatal(err) + } + + feeds, err := feed.NewTestHandler(feedsDir, &feed.HandlerParams{}) + if err != nil { + t.Fatal(err) + } + + swarmApi := api.NewAPI(fileStore, nil, feeds.Handler, nil, tags) + pinAPI := NewAPI(lStore, stateStore, nil, tags, swarmApi) + + closeFunc := func() { + err := stateStore.Close() + if err != nil { + t.Fatalf("Could not close state store") + } + err = lStore.Close() + if err != nil { + t.Fatalf("Could not close localStore") + } + feeds.Close() + err = os.RemoveAll(feedsDir) + if err != nil { + t.Fatalf("Could not remove swarm feeds dir") + } + err = os.RemoveAll(swarmDir) + if err != nil { + t.Fatalf("Could not remove swarm temp dir") + } + } + + return pinAPI, fileStore, closeFunc +} + +func uploadFile(t *testing.T, f *storage.FileStore, data []byte, toEncrypt bool) storage.Address { + t.Helper() + + size := int64(len(data)) + ctx := context.TODO() + addr, wait, err := f.Store(ctx, bytes.NewReader(data), size, toEncrypt) + if err != nil { + t.Fatalf("could not store file. Error: %s", err.Error()) + } + + err = wait(ctx) + if err != nil { + t.Fatalf("Store wait error: %v", err.Error()) + } + return addr +} + +type testFileInfo struct { + fileName string + fileHash storage.Address +} + +func uploadCollection(t *testing.T, p *API, f *storage.FileStore, toEncrypt bool) storage.Address { + file1hash := uploadFile(t, f, testutil.RandomBytes(1, 10000), toEncrypt) + file2hash := uploadFile(t, f, testutil.RandomBytes(2, 10000), toEncrypt) + file3hash := uploadFile(t, f, testutil.RandomBytes(3, 10000), toEncrypt) + file4hash := uploadFile(t, f, testutil.RandomBytes(4, 10000), toEncrypt) + file5hash := uploadFile(t, f, testutil.RandomBytes(5, 10000), toEncrypt) + file6hash := uploadFile(t, f, testutil.RandomBytes(6, 10000), toEncrypt) + file7hash := uploadFile(t, f, testutil.RandomBytes(7, 10000), toEncrypt) + file8hash := uploadFile(t, f, testutil.RandomBytes(8, 10000), toEncrypt) + + var testDirFiles = []testFileInfo{ + {"file1.txt", file1hash}, + {"file2.txt", file2hash}, + {"dir1/file3.txt", file3hash}, + {"dir1/file4.txt", file4hash}, + {"dir2/file5.txt", file5hash}, + {"dir2/dir3/file6.txt", file6hash}, + {"dir2/dir4/file7.txt", file7hash}, + {"dir2/dir4/file8.txt", file8hash}, + } + + newAddr, err := p.api.NewManifest(context.TODO(), toEncrypt) + if err != nil { + t.Fatalf("could not create new manifest error: %v", err.Error()) + } + + // Simulate "swarm --recursive up" + newAddr, err = p.api.UpdateManifest(context.TODO(), newAddr, func(mw *api.ManifestWriter) error { + + for _, fileInfo := range testDirFiles { + entry := &api.ManifestEntry{ + Hash: hex.EncodeToString(fileInfo.fileHash), + Path: fileInfo.fileName, + ContentType: mime.TypeByExtension(filepath.Ext(fileInfo.fileName)), + } + _, err = mw.AddEntry(context.TODO(), nil, entry) + if err != nil { + t.Fatalf("could not create new manifest error: %v", err.Error()) + } + } + return nil + }) + + return newAddr +} + +// This function is called from test after a file is pinned. +// It check if the file's chunks are properly pinned. +// Assumption is that the file is uploaded in an empty database so that it can be easily tested. +// It takes the root hash and expected pinCounter value. +// It also has some hacks to take care of existing issues in the way we upload. +// +// The check process is as follows +// 1) Check if the root hash is present in the pinnedFile map +// 2) Check if all the files's chunks are in pinIndex +// a) Get all the chunks +// get it from retrievalDataIndex +// since the assumption is the DB has only this file, it gives all the file's chunks. +// getAllRefs cannot be used here as it does not give the chunks that belong to manifests. +// b) Get all chunks that are pinned (from pinIndex) +// In every upload.. an empty manifest is uploaded. that why add this hash to this list +// c) Check if both the above lists are equal +// 3) Check if all the chunks pinned have the proper pinCounter +// - This is just a simple go through of all the pinned chunks list and check if the counter is +// equal to the pin counter given as argument +func failIfNotPinned(t *testing.T, p *API, rootHash []byte, pinCounter uint64, isRaw bool) { + t.Helper() + + // 1 - Check if the root hash is pinned in state store + _, err := p.getPinnedFile(rootHash) + if err != nil { + t.Fatalf("File %s not pinned in state store", rootHash) + } + + // 2a - Get all the chunks of the file from retrievalDataIndex (since the file is uploaded in an empty database) + chunksInDB := p.getAllChunksFromDB(t) + + // 2b - Get pinned chunks details from pinning indexes + pinnedChunks := p.collectPinnedChunks(t, rootHash, "", isRaw) + if !isRaw { + // Add the empty manifest chunk + pinnedChunks["8b634aea26eec353ac0ecbec20c94f44d6f8d11f38d4578a4c207a84c74ef731"] = pinCounter + } + + // 2c - Check if number of chunk hashes are same + if len(chunksInDB) != len(pinnedChunks) { + t.Fatalf("Expected number of chunks to be %d, but is %d", len(chunksInDB), len(pinnedChunks)) + } + + // 2c - Check if all the chunk address are same + // tolerate 1 chunk failure for dummy manifest on encrypted collection + tolerateOneChunkFailure := true + for hash := range chunksInDB { + if _, ok := pinnedChunks[hash]; !ok { + if !isRaw && tolerateOneChunkFailure { + tolerateOneChunkFailure = false + continue + } + t.Fatalf("Expected chunk %s not present", hash) + } + } + + // 3 - Check for pin counter correctness + if pinCounter != 0 { + for _, pc := range pinnedChunks { + if pc != pinCounter { + t.Fatalf("Expected pin counter %d got %d", pinCounter, pc) + } + } + } + + pinnedFiles, err := p.ListPinFiles() + if err != nil { + t.Fatalf("Could not load pin state from state store") + } + + fileInfo, ok := pinnedFiles[hex.EncodeToString(rootHash)] + if !ok { + t.Fatalf("Fileinfo not present in state store") + } + + if fileInfo.isRaw != isRaw { + t.Fatalf("Invalid isRaw state in fileInfo") + } + + if fileInfo.pinCounter != pinCounter { + t.Fatalf("Invalid pincounter expected %d got %d", pinCounter, fileInfo.pinCounter) + } +} + +func failIfNotUnpinned(t *testing.T, p *API, rootHash []byte, isRaw bool) { + t.Helper() + + // root hash should not be in state DB + _, err := p.getPinnedFile(rootHash) + if err == nil { + t.Fatalf("File %s pinned in pinFilesIndex", rootHash) + } + + // The chunks of this file should not be in pinIndex too + pinnedChunks := p.collectPinnedChunks(t, rootHash, "", isRaw) + if len(pinnedChunks) != 0 { + t.Fatalf("Chunks of this file present in pinIndex") + } +} + +func pinUnpinAndFailIfError(t *testing.T, p *API, rootHash []byte, noOfPinUnpin int, isRaw bool) { + t.Helper() + + // Pin the file and check if it is pinned + for i := 0; i < noOfPinUnpin; i++ { + err := p.PinFiles(rootHash, isRaw, "") + if err != nil { + t.Fatalf("Could not pin " + err.Error()) + } + pinCounter := uint64(i + 1) + failIfNotPinned(t, p, rootHash, pinCounter, isRaw) + } + + // Unpin and see if the file is unpinned + for i := noOfPinUnpin; i > 0; i-- { + err := p.UnpinFiles(rootHash, "") + if err != nil { + t.Fatalf("Could not unpin " + err.Error()) + } + + if i == 1 { + // Final unpinning + failIfNotUnpinned(t, p, rootHash, isRaw) + } else { + // Check if the pin counter is decremented + pinCounter := uint64(i - 1) + failIfNotPinned(t, p, rootHash, pinCounter, isRaw) + } + } +} + +// collectPinnedChunks is used to collect all the chunks that are pinned as part of the +// given root hash. +func (p *API) collectPinnedChunks(t *testing.T, rootHash []byte, credentials string, isRaw bool) map[string]uint64 { + t.Helper() + + pinnedChunks := make(map[string]uint64) + var lock = sync.RWMutex{} + walkerFunction := func(ref storage.Reference) error { + chunkAddr := p.removeDecryptionKeyFromChunkHash(ref) + pinCounter, err := p.getPinCounterOfChunk(chunk.Address(chunkAddr)) + if err != nil { + if err == chunk.ErrChunkNotFound { + return nil + } else { + return err + } + } + lock.Lock() + defer lock.Unlock() + pinnedChunks[hex.EncodeToString(chunkAddr)] = pinCounter + return nil + } + err := p.walkChunksFromRootHash(rootHash, isRaw, credentials, walkerFunction) + if err != nil { + t.Fatal("Error during walking") + } + + return pinnedChunks +} + +// getAllChunksFromDB is used in testing to generate the truth dataset about all the chunks +// that are present in the DB. +func (p *API) getAllChunksFromDB(t *testing.T) map[string]int { + t.Helper() + + var addrLock sync.RWMutex + addrs := make(map[string]int) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + var wg sync.WaitGroup + + for bin := uint8(0); bin < uint8(chunk.MaxPO); bin++ { + ch, stop := p.db.SubscribePull(ctx, bin, 0, 0) + defer stop() + + wg.Add(1) + go getChunks(t, bin, addrs, &addrLock, ch, &wg, ctx) + } + + wg.Wait() + return addrs +} + +func getChunks(t *testing.T, bin uint8, addrs map[string]int, addrLock *sync.RWMutex, ch <-chan chunk.Descriptor, wg *sync.WaitGroup, ctx context.Context) { + t.Helper() + + defer wg.Done() + for { + select { + case got, ok := <-ch: + if !ok { + return + } + addrLock.Lock() + addrs[hex.EncodeToString(got.Address)] = 0 + addrLock.Unlock() + case <-ctx.Done(): + return + } + } +} diff --git a/swarm.go b/swarm.go index 14edcc3064..35fc014fec 100644 --- a/swarm.go +++ b/swarm.go @@ -32,11 +32,6 @@ import ( "time" "unicode" - "github.com/ethersphere/swarm/chunk" - - "github.com/ethersphere/swarm/storage/feed" - "github.com/ethersphere/swarm/storage/localstore" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -46,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethersphere/swarm/api" httpapi "github.com/ethersphere/swarm/api/http" + "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/contracts/chequebook" "github.com/ethersphere/swarm/contracts/ens" "github.com/ethersphere/swarm/fuse" @@ -56,7 +52,10 @@ import ( "github.com/ethersphere/swarm/pss" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/feed" + "github.com/ethersphere/swarm/storage/localstore" "github.com/ethersphere/swarm/storage/mock" + "github.com/ethersphere/swarm/storage/pin" "github.com/ethersphere/swarm/swap" "github.com/ethersphere/swarm/tracing" ) @@ -85,6 +84,7 @@ type Swarm struct { stateStore *state.DBStore accountingMetrics *protocols.AccountingMetrics cleanupFuncs []func() error + pinAPI *pin.API // API object implements all pinning related commands tracerClose io.Closer } @@ -233,6 +233,9 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey, tags) + // Instantiate the pinAPI object with the already opened localstore + self.pinAPI = pin.NewAPI(localStore, self.stateStore, self.config.FileStoreParams, tags, self.api) + self.sfs = fuse.NewSwarmFS(self.api) log.Debug("Initialized FUSE filesystem") @@ -382,11 +385,10 @@ func (s *Swarm) Start(srv *p2p.Server) error { if s.ps != nil { s.ps.Start(srv) } - // start swarm http proxy server if s.config.Port != "" { addr := net.JoinHostPort(s.config.ListenAddr, s.config.Port) - server := httpapi.NewServer(s.api, s.config.Cors) + server := httpapi.NewServer(s.api, s.pinAPI, s.config.Cors) if s.config.Cors != "" { log.Info("Swarm HTTP proxy CORS headers", "allowedOrigins", s.config.Cors) @@ -529,6 +531,12 @@ func (s *Swarm) APIs() []rpc.API { Service: protocols.NewAccountingApi(s.accountingMetrics), Public: false, }, + { + Namespace: "pin", + Version: pin.Version, + Service: s.pinAPI, + Public: false, + }, } apis = append(apis, s.bzz.APIs()...)