diff --git a/badger/backup_test.go b/badger/backup_test.go
index d2fd83972..fc8d2b8db 100644
--- a/badger/backup_test.go
+++ b/badger/backup_test.go
@@ -125,7 +125,7 @@ func TestBackupRestore2(t *testing.T) {
 	s2Path := filepath.Join(tmpdir, "test2")
 	s3Path := filepath.Join(tmpdir, "test3")
 
-	db1, err := Open(DefaultOptions(s1Path))
+	db1, err := Open(getTestOptions(s1Path))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -160,7 +160,7 @@ func TestBackupRestore2(t *testing.T) {
 	}
 	fmt.Println("backup1 length:", backup.Len())
 
-	db2, err := Open(DefaultOptions(s2Path))
+	db2, err := Open(getTestOptions(s2Path))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -211,7 +211,7 @@ func TestBackupRestore2(t *testing.T) {
 		t.Fatal(err)
 	}
 	fmt.Println("backup2 length:", backup.Len())
-	db3, err := Open(DefaultOptions(s3Path))
+	db3, err := Open(getTestOptions(s3Path))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -454,7 +454,7 @@ func TestBackupLoadIncremental(t *testing.T) {
 	require.True(t, bb.Len() > 0)
 
 	// restore
-	db2, err := Open(DefaultOptions(filepath.Join(tmpdir, "restore2")))
+	db2, err := Open(getTestOptions(filepath.Join(tmpdir, "restore2")))
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/badger/db.go b/badger/db.go
index 30a3725fc..016e10826 100644
--- a/badger/db.go
+++ b/badger/db.go
@@ -35,6 +35,7 @@ import (
 	"github.com/dgraph-io/badger/skl"
 	"github.com/dgraph-io/badger/table"
 	"github.com/dgraph-io/badger/y"
+	"github.com/dgraph-io/ristretto"
 	humanize "github.com/dustin/go-humanize"
 	"github.com/pkg/errors"
 	"golang.org/x/net/trace"
@@ -88,8 +89,9 @@ type DB struct {
 
 	orc *oracle
 
-	pub      *publisher
-	registry *KeyRegistry
+	pub        *publisher
+	registry   *KeyRegistry
+	blockCache *ristretto.Cache
 }
 
 const (
@@ -277,6 +279,18 @@ func Open(opt Options) (db *DB, err error) {
 		elog = trace.NewEventLog("Badger", "DB")
 	}
 
+	config := ristretto.Config{
+		// Use 5% of cache memory for storing counters.
+		NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2),
+		MaxCost:     int64(float64(opt.MaxCacheSize) * 0.95),
+		BufferItems: 64,
+		// Enable metrics once https://github.com/dgraph-io/ristretto/issues/92 is resolved.
+		Metrics: false,
+	}
+	cache, err := ristretto.NewCache(&config)
+	if err != nil {
+		return nil, errors.Wrap(err, "failed to create cache")
+	}
 	db = &DB{
 		imm:           make([]*skl.Skiplist, 0, opt.NumMemtables),
 		flushChan:     make(chan flushTask, opt.NumMemtables),
@@ -288,6 +302,7 @@ func Open(opt Options) (db *DB, err error) {
 		valueDirGuard: valueDirLockGuard,
 		orc:           newOracle(opt),
 		pub:           newPublisher(),
+		blockCache:    cache,
 	}
 
 	krOpt := KeyRegistryOptions{
@@ -367,6 +382,14 @@ func Open(opt Options) (db *DB, err error) {
 	return db, nil
 }
 
+// CacheMetrics returns the metrics for the underlying cache.
+func (db *DB) CacheMetrics() *ristretto.Metrics {
+	return nil
+	// Do not enable ristretto metrics in badger until issue
+	// https://github.com/dgraph-io/ristretto/issues/92 is resolved.
+	// return db.blockCache.Metrics()
+}
+
 // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
 // disk. Calling DB.Close() multiple times would still only close the DB once.
 func (db *DB) Close() error {
@@ -453,6 +476,7 @@ func (db *DB) close() (err error) {
 	db.elog.Printf("Waiting for closer")
 	db.closers.updateSize.SignalAndWait()
 	db.orc.Stop()
+	db.blockCache.Close()
 
 	db.elog.Finish()
 
@@ -909,6 +933,8 @@ func (db *DB) handleFlushTask(ft flushTask) error {
 	}
 	bopts := buildTableOptions(db.opt)
 	bopts.DataKey = dk
+	// Builder does not need cache but the same options are used for opening table.
+	bopts.Cache = db.blockCache
 	tableData := buildL0Table(ft, bopts)
 
 	fileID := db.lc.reserveFileID()
@@ -1447,6 +1473,7 @@ func (db *DB) dropAll() (func(), error) {
 	db.vhead = valuePointer{} // Zero it out.
 	db.lc.nextFileID = 1
 	db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
+	db.blockCache.Clear()
 	return resume, nil
 }
 
diff --git a/badger/db_test.go b/badger/db_test.go
index 1d88a7e3b..01cb092ff 100644
--- a/badger/db_test.go
+++ b/badger/db_test.go
@@ -74,7 +74,8 @@ func getTestOptions(dir string) Options {
 	opt := DefaultOptions(dir).
 		WithMaxTableSize(1 << 15). // Force more compaction.
 		WithLevelOneSize(4 << 15). // Force more compaction.
-		WithSyncWrites(false)
+		WithSyncWrites(false).
+		WithMaxCacheSize(10 << 20)
 	if !*mmap {
 		return opt.WithValueLogLoadingMode(options.FileIO)
 	}
@@ -1571,6 +1572,7 @@ func TestMinReadTs(t *testing.T) {
 }
 
 func TestGoroutineLeak(t *testing.T) {
+	time.Sleep(1 * time.Second)
 	before := runtime.NumGoroutine()
 	t.Logf("Num go: %d", before)
 	for i := 0; i < 12; i++ {
@@ -1602,6 +1604,7 @@ func TestGoroutineLeak(t *testing.T) {
 			require.Equal(t, true, updated)
 		})
 	}
+	time.Sleep(2 * time.Second)
 	require.Equal(t, before, runtime.NumGoroutine())
 }
 
diff --git a/badger/go.mod b/badger/go.mod
index c52fb3ace..5b9b05c72 100644
--- a/badger/go.mod
+++ b/badger/go.mod
@@ -6,7 +6,7 @@ require (
 	github.com/DataDog/zstd v1.4.1
 	github.com/cespare/xxhash v1.1.0
 	github.com/cespare/xxhash/v2 v2.1.0 // indirect
-	github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e
+	github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534
 	github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
 	github.com/dustin/go-humanize v1.0.0
 	github.com/golang/protobuf v1.3.1
diff --git a/badger/go.sum b/badger/go.sum
index 4a485aafa..5ce15406f 100644
--- a/badger/go.sum
+++ b/badger/go.sum
@@ -24,6 +24,8 @@ github.com/dgraph-io/ristretto v0.0.0-20190903064322-eb48d2f7ca30 h1:FkdGlqxPjfH
 github.com/dgraph-io/ristretto v0.0.0-20190903064322-eb48d2f7ca30/go.mod h1:UvZmzj8odp3S1nli6yEb1vLME8iJFBrRcw8rAJEiu9Q=
 github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e h1:6ryPDbNhRiZ7Hbh8G3IxUaw3utIzI2FZEP5FBl8x8n0=
 github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e/go.mod h1:3dYWm7+Szhwp+wbqGQ5w8o8H1eBkPJEjIEAR5fXCCfs=
+github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534 h1:9G6fVccQriMJu4nXwpwLDoy9y31t/KUSLAbPcoBgv+4=
+github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
 github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
diff --git a/badger/levels.go b/badger/levels.go
index 8b278416f..1ef9da7dc 100644
--- a/badger/levels.go
+++ b/badger/levels.go
@@ -158,6 +158,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
 			// Set compression from table manifest.
 			topt.Compression = tf.Compression
 			topt.DataKey = dk
+			topt.Cache = db.blockCache
 			t, err := table.OpenTable(fd, topt)
 			if err != nil {
 				if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
@@ -509,6 +510,8 @@ func (s *levelsController) compactBuildTables(
 		}
 		bopts := buildTableOptions(s.kv.opt)
 		bopts.DataKey = dk
+		// Builder does not need cache but the same options are used for opening table.
+		bopts.Cache = s.kv.blockCache
 		builder := table.NewTableBuilder(bopts)
 		var numKeys, numSkips uint64
 		for ; it.Valid(); it.Next() {
diff --git a/badger/managed_db_test.go b/badger/managed_db_test.go
index 93e3a1de3..3ef8f7b1a 100644
--- a/badger/managed_db_test.go
+++ b/badger/managed_db_test.go
@@ -87,7 +87,7 @@ func TestDropAllManaged(t *testing.T) {
 	require.NoError(t, db.DropAll()) // Just call it twice, for fun.
 	require.Equal(t, 0, numKeysManaged(db, math.MaxUint64))
 
-	// Check that we can still write to mdb, and using lower timestamps.
+	// Check that we can still write to db, and using lower timestamps.
 	populate(db, 1)
 	require.Equal(t, int(N), numKeysManaged(db, math.MaxUint64))
 	require.NoError(t, db.Close())
diff --git a/badger/options.go b/badger/options.go
index fd14b0279..c88e298bc 100644
--- a/badger/options.go
+++ b/badger/options.go
@@ -57,9 +57,12 @@ type Options struct {
 	MaxLevels           int
 	ValueThreshold      int
 	NumMemtables        int
-	BlockSize           int
-	BloomFalsePositive  float64
-	KeepL0InMemory      bool
+	// Changing BlockSize across DB runs will not break badger. The block size is
+	// read from the block index stored at the end of the table.
+	BlockSize          int
+	BloomFalsePositive float64
+	KeepL0InMemory     bool
+	MaxCacheSize       int64
 
 	NumLevelZeroTables      int
 	NumLevelZeroTablesStall int
@@ -118,6 +121,7 @@ func DefaultOptions(path string) Options {
 		KeepL0InMemory:          true,
 		VerifyValueChecksum:     false,
 		Compression:             options.ZSTD,
+		MaxCacheSize:            1 << 30, // 1 GB
 		// Nothing to read/write value log using standard File I/O
 		// MemoryMap to mmap() the value log files
 		// (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32.
@@ -499,3 +503,12 @@ func (opt Options) WithVerifyValueChecksum(val bool) Options {
 	opt.VerifyValueChecksum = val
 	return opt
 }
+
+// WithMaxCacheSize returns a new Options value with MaxCacheSize set to the given value.
+//
+// This value specifies how much data cache should hold in memory. A small size of cache means lower
+// memory consumption and lookups/iterations would take longer.
+func (opt Options) WithMaxCacheSize(size int64) Options {
+	opt.MaxCacheSize = size
+	return opt
+}
diff --git a/badger/stream_writer.go b/badger/stream_writer.go
index b59eb9bb3..7bea6568a 100644
--- a/badger/stream_writer.go
+++ b/badger/stream_writer.go
@@ -410,6 +410,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
 	}
 	opts := buildTableOptions(w.db.opt)
 	opts.DataKey = builder.DataKey()
+	opts.Cache = w.db.blockCache
 	tbl, err := table.OpenTable(fd, opts)
 	if err != nil {
 		return err
diff --git a/badger/stream_writer_test.go b/badger/stream_writer_test.go
index 332d42387..7b7300a20 100644
--- a/badger/stream_writer_test.go
+++ b/badger/stream_writer_test.go
@@ -51,8 +51,8 @@ func getSortedKVList(valueSize, listSize int) *pb.KVList {
 
 // check if we can read values after writing using stream writer
 func TestStreamWriter1(t *testing.T) {
-	normalModeOpts := DefaultOptions("")
-	managedModeOpts := DefaultOptions("")
+	normalModeOpts := getTestOptions("")
+	managedModeOpts := getTestOptions("")
 	managedModeOpts.managedTxns = true
 
 	for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} {
@@ -92,8 +92,8 @@ func TestStreamWriter1(t *testing.T) {
 
 // write more keys to db after writing keys using stream writer
 func TestStreamWriter2(t *testing.T) {
-	normalModeOpts := DefaultOptions("")
-	managedModeOpts := DefaultOptions("")
+	normalModeOpts := getTestOptions("")
+	managedModeOpts := getTestOptions("")
 	managedModeOpts.managedTxns = true
 
 	for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} {
@@ -144,8 +144,8 @@ func TestStreamWriter2(t *testing.T) {
 }
 
 func TestStreamWriter3(t *testing.T) {
-	normalModeOpts := DefaultOptions("")
-	managedModeOpts := DefaultOptions("")
+	normalModeOpts := getTestOptions("")
+	managedModeOpts := getTestOptions("")
 	managedModeOpts.managedTxns = true
 
 	for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} {
diff --git a/badger/table/table.go b/badger/table/table.go
index 7c0b89118..bb1861905 100644
--- a/badger/table/table.go
+++ b/badger/table/table.go
@@ -20,6 +20,7 @@ import (
 	"crypto/aes"
 	"fmt"
 	"io"
+	"math"
 	"os"
 	"path"
 	"path/filepath"
@@ -27,6 +28,7 @@ import (
 	"strings"
 	"sync"
 	"sync/atomic"
+	"unsafe"
 
 	"github.com/DataDog/zstd"
 	"github.com/golang/protobuf/proto"
@@ -36,10 +38,12 @@ import (
 	"github.com/dgraph-io/badger/options"
 	"github.com/dgraph-io/badger/pb"
 	"github.com/dgraph-io/badger/y"
+	"github.com/dgraph-io/ristretto"
 	"github.com/dgraph-io/ristretto/z"
 )
 
 const fileSuffix = ".sst"
+const intSize = int(unsafe.Sizeof(int(0)))
 
 // Options contains configurable options for Table/Builder.
 type Options struct {
@@ -64,6 +68,8 @@ type Options struct {
 
 	// Compression indicates the compression algorithm used for block compression.
 	Compression options.CompressionType
+
+	Cache *ristretto.Cache
 }
 
 // TableInterface is useful for testing.
@@ -149,6 +155,11 @@ type block struct {
 	chkLen            int // checksum length
 }
 
+func (b *block) size() int64 {
+	return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
+		cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
+}
+
 func (b block) verifyCheckSum() error {
 	cs := &pb.Checksum{}
 	if err := proto.Unmarshal(b.checksum, cs); err != nil {
@@ -348,7 +359,13 @@ func (t *Table) block(idx int) (*block, error) {
 	if idx >= len(t.blockIndex) {
 		return nil, errors.New("block out of index")
 	}
-
+	if t.opt.Cache != nil {
+		key := t.blockCacheKey(idx)
+		blk, ok := t.opt.Cache.Get(key)
+		if ok && blk != nil {
+			return blk.(*block), nil
+		}
+	}
 	ko := t.blockIndex[idx]
 	blk := &block{
 		offset: int(ko.Offset),
@@ -407,7 +424,17 @@ func (t *Table) block(idx int) (*block, error) {
 			return nil, err
 		}
 	}
-	return blk, err
+	if t.opt.Cache != nil {
+		key := t.blockCacheKey(idx)
+		t.opt.Cache.Set(key, blk, blk.size())
+	}
+	return blk, nil
+}
+
+func (t *Table) blockCacheKey(idx int) uint64 {
+	y.AssertTrue(t.ID() < math.MaxUint32)
+	y.AssertTrue(idx < math.MaxUint32)
+	return (t.ID() << 32) | uint64(idx)
 }
 
 // Size is its file size in bytes
diff --git a/badger/table/table_test.go b/badger/table/table_test.go
index cbb120fa6..8c8de96b4 100644
--- a/badger/table/table_test.go
+++ b/badger/table/table_test.go
@@ -31,6 +31,7 @@ import (
 	"github.com/cespare/xxhash"
 	"github.com/dgraph-io/badger/options"
 	"github.com/dgraph-io/badger/y"
+	"github.com/dgraph-io/ristretto"
 	"github.com/stretchr/testify/require"
 )
 
@@ -47,7 +48,6 @@ func getTestTableOptions() Options {
 	return Options{
 		Compression:        options.ZSTD,
 		LoadingMode:        options.LoadToRAM,
-		ChkMode:            options.OnTableAndBlockRead,
 		BlockSize:          4 * 1024,
 		BloomFalsePositive: 0.01,
 	}
@@ -738,6 +738,7 @@ func TestTableChecksum(t *testing.T) {
 	rb := make([]byte, 100)
 	rand.Read(rb)
 	opts := getTestTableOptions()
+	opts.ChkMode = options.OnTableAndBlockRead
 	f := buildTestTable(t, "k", 10000, opts)
 	fi, err := f.Stat()
 	require.NoError(t, err, "unable to get file information")
@@ -749,9 +750,16 @@ func TestTableChecksum(t *testing.T) {
 	}
 }
 
+var cacheConfig = ristretto.Config{
+	NumCounters: 1000000 * 10,
+	MaxCost:     1000000,
+	BufferItems: 64,
+	Metrics:     true,
+}
+
 func BenchmarkRead(b *testing.B) {
 	n := int(5 * 1e6)
-	tbl := getTableForBenchmarks(b, n)
+	tbl := getTableForBenchmarks(b, n, nil)
 	defer tbl.DecrRef()
 
 	b.ResetTimer()
@@ -768,7 +776,9 @@ func BenchmarkRead(b *testing.B) {
 
 func BenchmarkReadAndBuild(b *testing.B) {
 	n := int(5 * 1e6)
-	tbl := getTableForBenchmarks(b, n)
+
+	var cache, _ = ristretto.NewCache(&cacheConfig)
+	tbl := getTableForBenchmarks(b, n, cache)
 	defer tbl.DecrRef()
 
 	b.ResetTimer()
@@ -776,6 +786,7 @@ func BenchmarkReadAndBuild(b *testing.B) {
 	for i := 0; i < b.N; i++ {
 		func() {
 			opts := Options{Compression: options.ZSTD, BlockSize: 4 * 0124, BloomFalsePositive: 0.01}
+			opts.Cache = cache
 			newBuilder := NewTableBuilder(opts)
 			it := tbl.NewIterator(false)
 			defer it.Close()
@@ -794,9 +805,14 @@ func BenchmarkReadMerged(b *testing.B) {
 	y.AssertTrue((n % m) == 0)
 	tableSize := n / m
 	var tables []*Table
+
+	var cache, err = ristretto.NewCache(&cacheConfig)
+	require.NoError(b, err)
+
 	for i := 0; i < m; i++ {
 		filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63())
 		opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01}
+		opts.Cache = cache
 		builder := NewTableBuilder(opts)
 		f, err := y.OpenSyncedFile(filename, true)
 		y.Check(err)
@@ -855,7 +871,7 @@ func BenchmarkChecksum(b *testing.B) {
 
 func BenchmarkRandomRead(b *testing.B) {
 	n := int(5 * 1e6)
-	tbl := getTableForBenchmarks(b, n)
+	tbl := getTableForBenchmarks(b, n, nil)
 	defer tbl.DecrRef()
 
 	r := rand.New(rand.NewSource(time.Now().Unix()))
@@ -880,9 +896,15 @@ func BenchmarkRandomRead(b *testing.B) {
 	}
 }
 
-func getTableForBenchmarks(b *testing.B, count int) *Table {
+func getTableForBenchmarks(b *testing.B, count int, cache *ristretto.Cache) *Table {
 	rand.Seed(time.Now().Unix())
 	opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01}
+	if cache == nil {
+		var err error
+		cache, err = ristretto.NewCache(&cacheConfig)
+		require.NoError(b, err)
+	}
+	opts.Cache = cache
 	builder := NewTableBuilder(opts)
 	filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63())
 	f, err := y.OpenSyncedFile(filename, true)