From 22949f0a1d698c01512e5542674ecc185ee8ffa6 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Wed, 1 Sep 2021 14:21:41 +0100 Subject: [PATCH] Implement new index type that also includes mutltihash code Implement a new CARv2 index that contains enough information to reconstruct the multihashes of the data payload, since `CarIndexSorted` only includes multihash digests. Note, this index intentionally ignores any given record with `multihash.IDENTITY` CID hash. Add a test that asserts offsets for the same CID across sorted index and new multihash sorted index are consistent. Note, there is a need for a multicodec to be defined for the new index type. For now TODOs are left since it requires coordination across repos. --- v2/index/index.go | 3 + v2/index/indexmhsorted.go | 174 ++++++++++++++++++++++++++++++++++++++ v2/index_gen_test.go | 131 ++++++++++++++++++++++++---- 3 files changed, 293 insertions(+), 15 deletions(-) create mode 100644 v2/index/indexmhsorted.go diff --git a/v2/index/index.go b/v2/index/index.go index 3408dfbd..563f911e 100644 --- a/v2/index/index.go +++ b/v2/index/index.go @@ -72,6 +72,9 @@ func New(codec multicodec.Code) (Index, error) { switch codec { case multicodec.CarIndexSorted: return newSorted(), nil + // TODO replace with proper multicodec once defined. + case IndexMultihashSortedCodec: + return newMultihashSorted(), nil default: return nil, fmt.Errorf("unknwon index codec: %v", codec) } diff --git a/v2/index/indexmhsorted.go b/v2/index/indexmhsorted.go new file mode 100644 index 00000000..faabcd15 --- /dev/null +++ b/v2/index/indexmhsorted.go @@ -0,0 +1,174 @@ +package index + +import ( + "encoding/binary" + "io" + "sort" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-multihash" +) + +// TODO replace with propper multicodec once defined. +const IndexMultihashSortedCodec = multicodec.Code(0x0401) + +// multiWidthCodedIndex maps multihash code (i.e. hashing algorithm) to singleWidthIndex. +// This index type is implemented with the underlying assumption that all digests generated by the +// same multihash code are of the same length. +// This index ignores any Record with multihash.IDENTITY. +type multiWidthCodedIndex map[uint64]*singleWidthCodedIndex + +type singleWidthCodedIndex struct { + singleWidthIndex + code uint64 +} + +func (m *singleWidthCodedIndex) Marshal(w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, m.code); err != nil { + return err + } + return m.singleWidthIndex.Marshal(w) +} + +func (m *singleWidthCodedIndex) Unmarshal(r io.Reader) error { + if err := binary.Read(r, binary.LittleEndian, &m.code); err != nil { + return err + } + return m.singleWidthIndex.Unmarshal(r) +} + +func (m *multiWidthCodedIndex) Codec() multicodec.Code { + // TODO introduce this codec to mutlicodec table once finalized. + return IndexMultihashSortedCodec +} + +func (m *multiWidthCodedIndex) Marshal(w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil { + return err + } + // The codes are unique, but ranging over a map isn't deterministic. + // As per the CARv2 spec, we must order buckets by digest length. + // TODO update CARv2 spec to reflect this for the new index type. + codes := m.sortedKeys() + + for _, code := range codes { + swci := (*m)[code] + if err := swci.Marshal(w); err != nil { + return err + } + } + return nil +} + +func (m *multiWidthCodedIndex) sortedKeys() []uint64 { + codes := make([]uint64, 0, len(*m)) + for code := range *m { + codes = append(codes, code) + } + sort.Slice(codes, func(i, j int) bool { + return codes[i] < codes[j] + }) + return codes +} + +func (m *multiWidthCodedIndex) Unmarshal(r io.Reader) error { + var l int32 + if err := binary.Read(r, binary.LittleEndian, &l); err != nil { + return err + } + for i := 0; i < int(l); i++ { + swci := &singleWidthCodedIndex{} + if err := swci.Unmarshal(r); err != nil { + return err + } + m.put(swci) + } + return nil +} + +func (m *multiWidthCodedIndex) put(swci *singleWidthCodedIndex) { + (*m)[swci.code] = swci +} + +func (m *multiWidthCodedIndex) Load(records []Record) error { + // Split cids on their digest length + byCode := make(map[uint64][]digestRecord) + for _, item := range records { + dmh, err := multihash.Decode(item.Hash()) + if err != nil { + return err + } + + code := dmh.Code + + // Ignore IDENTITY multihashes in the index. + if code == multihash.IDENTITY { + continue + } + digest := dmh.Digest + swi, ok := byCode[code] + if !ok { + swi = make([]digestRecord, 0) + byCode[code] = swi + } + + byCode[code] = append(swi, digestRecord{digest, item.Offset}) + } + + // Sort each list. then write to compact form. + for code, lst := range byCode { + sort.Sort(recordSet(lst)) + + // None of the lists can possibly be empty at this point; so we grab the first one + width := len(lst[0].digest) + + // TODO: refactor compaction as a receiver on singleWidthIndex + swci := newSingleWidthCodedIndex(width, lst, code) + m.put(swci) + } + return nil +} + +func newSingleWidthCodedIndex(width int, lst []digestRecord, code uint64) *singleWidthCodedIndex { + // TODO refactor duplicate compaction code in singleWidthIndex type + rcrdWdth := width + 8 + compact := make([]byte, rcrdWdth*len(lst)) + for off, itm := range lst { + itm.write(compact[off*rcrdWdth : (off+1)*rcrdWdth]) + } + swci := &singleWidthCodedIndex{ + singleWidthIndex: singleWidthIndex{ + width: uint32(rcrdWdth), + len: uint64(len(lst)), + index: compact, + }, + code: code, + } + return swci +} + +func (m *multiWidthCodedIndex) GetAll(cid cid.Cid, f func(uint64) bool) error { + hash := cid.Hash() + dmh, err := multihash.Decode(hash) + if err != nil { + return err + } + swci, err := m.get(dmh) + if err != nil { + return err + } + return swci.getAll(dmh.Digest, f) +} + +func (m *multiWidthCodedIndex) get(dmh *multihash.DecodedMultihash) (*singleWidthCodedIndex, error) { + if codedIdx, ok := (*m)[dmh.Code]; ok { + return codedIdx, nil + } + return nil, ErrNotFound +} + +func newMultihashSorted() Index { + index := make(multiWidthCodedIndex) + return &index +} diff --git a/v2/index_gen_test.go b/v2/index_gen_test.go index 058d07e6..e41f6d5a 100644 --- a/v2/index_gen_test.go +++ b/v2/index_gen_test.go @@ -1,9 +1,20 @@ -package car +package car_test import ( + "fmt" + "io" "os" "testing" + "github.com/multiformats/go-multihash" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/internal/carv1" + internalio "github.com/ipld/go-car/v2/internal/io" + "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-varint" + "github.com/ipld/go-car/v2/index" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -13,19 +24,19 @@ func TestReadOrGenerateIndex(t *testing.T) { tests := []struct { name string carPath string - readOpts []ReadOption + readOpts []carv2.ReadOption wantIndexer func(t *testing.T) index.Index wantErr bool }{ { "CarV1IsIndexedAsExpected", "testdata/sample-v1.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { v1, err := os.Open("testdata/sample-v1.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1) + want, err := carv2.GenerateIndex(v1) require.NoError(t, err) return want }, @@ -34,12 +45,12 @@ func TestReadOrGenerateIndex(t *testing.T) { { "CarV2WithIndexIsReturnedAsExpected", "testdata/sample-wrapped-v2.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { v2, err := os.Open("testdata/sample-wrapped-v2.car") require.NoError(t, err) defer v2.Close() - reader, err := NewReader(v2) + reader, err := carv2.NewReader(v2) require.NoError(t, err) want, err := index.ReadFrom(reader.IndexReader()) require.NoError(t, err) @@ -50,12 +61,12 @@ func TestReadOrGenerateIndex(t *testing.T) { { "CarV1WithZeroLenSectionIsGeneratedAsExpected", "testdata/sample-v1-with-zero-len-section.car", - []ReadOption{ZeroLengthSectionAsEOF(true)}, + []carv2.ReadOption{carv2.ZeroLengthSectionAsEOF(true)}, func(t *testing.T) index.Index { v1, err := os.Open("testdata/sample-v1-with-zero-len-section.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1, ZeroLengthSectionAsEOF(true)) + want, err := carv2.GenerateIndex(v1, carv2.ZeroLengthSectionAsEOF(true)) require.NoError(t, err) return want }, @@ -64,12 +75,12 @@ func TestReadOrGenerateIndex(t *testing.T) { { "AnotherCarV1WithZeroLenSectionIsGeneratedAsExpected", "testdata/sample-v1-with-zero-len-section2.car", - []ReadOption{ZeroLengthSectionAsEOF(true)}, + []carv2.ReadOption{carv2.ZeroLengthSectionAsEOF(true)}, func(t *testing.T) index.Index { v1, err := os.Open("testdata/sample-v1-with-zero-len-section2.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1, ZeroLengthSectionAsEOF(true)) + want, err := carv2.GenerateIndex(v1, carv2.ZeroLengthSectionAsEOF(true)) require.NoError(t, err) return want }, @@ -78,14 +89,14 @@ func TestReadOrGenerateIndex(t *testing.T) { { "CarV1WithZeroLenSectionWithoutOptionIsError", "testdata/sample-v1-with-zero-len-section.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { return nil }, true, }, { "CarOtherThanV1OrV2IsError", "testdata/sample-rootless-v42.car", - []ReadOption{}, + []carv2.ReadOption{}, func(t *testing.T) index.Index { return nil }, true, }, @@ -95,7 +106,7 @@ func TestReadOrGenerateIndex(t *testing.T) { carFile, err := os.Open(tt.carPath) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, carFile.Close()) }) - got, err := ReadOrGenerateIndex(carFile, tt.readOpts...) + got, err := carv2.ReadOrGenerateIndex(carFile, tt.readOpts...) if tt.wantErr { require.Error(t, err) } else { @@ -121,7 +132,7 @@ func TestGenerateIndexFromFile(t *testing.T) { v1, err := os.Open("testdata/sample-v1.car") require.NoError(t, err) defer v1.Close() - want, err := GenerateIndex(v1) + want, err := carv2.GenerateIndex(v1) require.NoError(t, err) return want }, @@ -142,7 +153,7 @@ func TestGenerateIndexFromFile(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := GenerateIndexFromFile(tt.carPath) + got, err := carv2.GenerateIndexFromFile(tt.carPath) if tt.wantErr { require.Error(t, err) } else { @@ -153,3 +164,93 @@ func TestGenerateIndexFromFile(t *testing.T) { }) } } + +func TestMultihashIndexSortedConsistencyWithIndexSorted(t *testing.T) { + path := "testdata/sample-v1.car" + + sortedIndex, err := carv2.GenerateIndexFromFile(path) + require.NoError(t, err) + require.Equal(t, multicodec.CarIndexSorted, sortedIndex.Codec()) + + f, err := os.Open(path) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, f.Close()) }) + br, err := carv2.NewBlockReader(f) + require.NoError(t, err) + + subject := generateMultihashSortedIndex(t, path) + for { + wantNext, err := br.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + + dmh, err := multihash.Decode(wantNext.Cid().Hash()) + require.NoError(t, err) + if dmh.Code == multihash.IDENTITY { + continue + } + + wantCid := wantNext.Cid() + var wantOffsets []uint64 + err = sortedIndex.GetAll(wantCid, func(o uint64) bool { + wantOffsets = append(wantOffsets, o) + fmt.Println(wantCid) + return false + }) + require.NoError(t, err) + + var gotOffsets []uint64 + err = subject.GetAll(wantCid, func(o uint64) bool { + gotOffsets = append(gotOffsets, o) + fmt.Println(wantCid) + return false + }) + fmt.Println("----------------") + require.NoError(t, err) + require.Equal(t, wantOffsets, gotOffsets) + } +} + +func generateMultihashSortedIndex(t *testing.T, path string) index.Index { + f, err := os.Open(path) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, f.Close()) }) + reader := internalio.ToByteReadSeeker(f) + header, err := carv1.ReadHeader(reader) + require.NoError(t, err) + require.Equal(t, uint64(1), header.Version) + + idx, err := index.New(index.IndexMultihashSortedCodec) + require.NoError(t, err) + records := make([]index.Record, 0) + + var sectionOffset int64 + sectionOffset, err = reader.Seek(0, io.SeekCurrent) + require.NoError(t, err) + + for { + sectionLen, err := varint.ReadUvarint(reader) + if err == io.EOF { + break + } + require.NoError(t, err) + + if sectionLen == 0 { + break + } + + cidLen, c, err := cid.CidFromReader(reader) + require.NoError(t, err) + records = append(records, index.Record{Cid: c, Offset: uint64(sectionOffset)}) + remainingSectionLen := int64(sectionLen) - int64(cidLen) + sectionOffset, err = reader.Seek(remainingSectionLen, io.SeekCurrent) + require.NoError(t, err) + } + + err = idx.Load(records) + require.NoError(t, err) + + return idx +}