Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

gx update and fix code to use new Cid type #10

Merged
merged 2 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.15: QmVDDgboX5nPUE4pBcK2xC1b9XbStA4t2KrUWBRMr9AiFd
1.1.0: QmQPWVDYeWvxN75cP4MGrbMVpADm2XqpM4KxgvbxkYk16u
30 changes: 15 additions & 15 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
counters: new(counters),

Expand Down Expand Up @@ -146,9 +146,9 @@ type Bitswap struct {
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan *cid.Cid
newBlocks chan cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan *cid.Cid
provideKeys chan cid.Cid

process process.Process

Expand Down Expand Up @@ -179,18 +179,18 @@ type counters struct {
}

type blockRequest struct {
Cid *cid.Cid
Cid cid.Cid
Ctx context.Context
}

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, bs.GetBlocks)
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
var out []*cid.Cid
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
var out []cid.Cid
for _, e := range bs.engine.WantlistForPeer(p) {
out = append(out, e.Cid)
}
Expand All @@ -208,7 +208,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
Expand Down Expand Up @@ -259,7 +259,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
return
}

bs.CancelWants([]*cid.Cid{blk.Cid()}, mses)
bs.CancelWants([]cid.Cid{blk.Cid()}, mses)
remaining.Remove(blk.Cid())
select {
case out <- blk:
Expand Down Expand Up @@ -288,7 +288,7 @@ func (bs *Bitswap) getNextSessionID() uint64 {
}

// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) {
if len(cids) == 0 {
return
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
bs.notifications.Publish(blk)

k := blk.Cid()
ks := []*cid.Cid{k}
ks := []cid.Cid{k}
for _, s := range bs.SessionsForBlock(k) {
s.receiveBlockFrom(from, blk)
bs.CancelWants(ks, s.id)
Expand All @@ -344,7 +344,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
}

// SessionsForBlock returns a slice of all sessions that may be interested in the given cid
func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session {
bs.sessLk.Lock()
defer bs.sessLk.Unlock()

Expand Down Expand Up @@ -440,9 +440,9 @@ func (bs *Bitswap) Close() error {
return bs.process.Close()
}

func (bs *Bitswap) GetWantlist() []*cid.Cid {
func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.wl.Entries()
out := make([]*cid.Cid, 0, len(entries))
out := make([]cid.Cid, 0, len(entries))
for _, e := range entries {
out = append(out, e.Cid)
}
Expand Down
12 changes: 6 additions & 6 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
}

var blkeys []*cid.Cid
var blkeys []cid.Cid
first := instances[0]
for _, b := range blocks {
blkeys = append(blkeys, b.Cid())
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestSendToWantingPeer(t *testing.T) {
// peerA requests and waits for block alpha
ctx, cancel := context.WithTimeout(context.Background(), waitTime)
defer cancel()
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []*cid.Cid{alpha.Cid()})
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []cid.Cid{alpha.Cid()})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestEmptyKey(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := bs.GetBlock(ctx, nil)
_, err := bs.GetBlock(ctx, cid.Cid{})
if err != blockstore.ErrNotFound {
t.Error("empty str key should return ErrNotFound")
}
Expand Down Expand Up @@ -393,15 +393,15 @@ func TestDoubleGet(t *testing.T) {
// through before the peers even get connected. This is okay, bitswap
// *should* be able to handle this.
ctx1, cancel1 := context.WithCancel(context.Background())
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()})
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []cid.Cid{blocks[0].Cid()})
if err != nil {
t.Fatal(err)
}

ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()

blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []*cid.Cid{blocks[0].Cid()})
blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []cid.Cid{blocks[0].Cid()})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func TestWantlistCleanup(t *testing.T) {
bswap := instances.Exchange
blocks := bg.Blocks(20)

var keys []*cid.Cid
var keys []cid.Cid
for _, b := range blocks {
keys = append(keys, b.Cid())
}
Expand Down
6 changes: 3 additions & 3 deletions decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ func (l *ledger) ReceivedBytes(n int) {
l.Accounting.BytesRecv += uint64(n)
}

func (l *ledger) Wants(k *cid.Cid, priority int) {
func (l *ledger) Wants(k cid.Cid, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority)
}

func (l *ledger) CancelWant(k *cid.Cid) {
func (l *ledger) CancelWant(k cid.Cid) {
l.wantList.Remove(k)
}

func (l *ledger) WantListContains(k *cid.Cid) (*wl.Entry, bool) {
func (l *ledger) WantListContains(k cid.Cid) (*wl.Entry, bool) {
return l.wantList.Contains(k)
}

Expand Down
10 changes: 5 additions & 5 deletions decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry *wantlist.Entry, to peer.ID)
Remove(k *cid.Cid, p peer.ID)
Remove(k cid.Cid, p peer.ID)

// NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count.
Expand Down Expand Up @@ -114,7 +114,7 @@ func (tl *prq) Pop() *peerRequestTask {
}

// Remove removes a task from the queue
func (tl *prq) Remove(k *cid.Cid, p peer.ID) {
func (tl *prq) Remove(k cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)]
if ok {
Expand Down Expand Up @@ -195,7 +195,7 @@ func (t *peerRequestTask) SetIndex(i int) {
}

// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k *cid.Cid) string {
func taskKey(p peer.ID, k cid.Cid) string {
return string(p) + k.KeyString()
}

Expand Down Expand Up @@ -281,15 +281,15 @@ func partnerCompare(a, b pq.Elem) bool {
}

// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask(k *cid.Cid) {
func (p *activePartner) StartTask(k cid.Cid) {
p.activelk.Lock()
p.activeBlocks.Add(k)
p.active++
p.activelk.Unlock()
}

// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone(k *cid.Cid) {
func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Lock()
p.activeBlocks.Remove(k)
p.active--
Expand Down
16 changes: 8 additions & 8 deletions get.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

type getBlocksFunc func(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
type getBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)

func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in GetBlock")
return nil, blockstore.ErrNotFound
}

Expand All @@ -28,7 +28,7 @@ func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, er
ctx, cancel := context.WithCancel(p)
defer cancel()

promise, err := gb(ctx, []*cid.Cid{k})
promise, err := gb(ctx, []cid.Cid{k})
if err != nil {
return nil, err
}
Expand All @@ -49,9 +49,9 @@ func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, er
}
}

type wantFunc func(context.Context, []*cid.Cid)
type wantFunc func(context.Context, []cid.Cid)

func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]*cid.Cid)) (<-chan blocks.Block, error) {
func getBlocksImpl(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
Expand All @@ -72,7 +72,7 @@ func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.Pub
return out, nil
}

func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]*cid.Cid)) {
func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) {
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
Expand Down
10 changes: 5 additions & 5 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ type BitSwapMessage interface {
Blocks() []blocks.Block

// AddEntry adds an entry to the Wantlist.
AddEntry(key *cid.Cid, priority int)
AddEntry(key cid.Cid, priority int)

Cancel(key *cid.Cid)
Cancel(key cid.Cid)

Empty() bool

Expand Down Expand Up @@ -134,16 +134,16 @@ func (m *impl) Blocks() []blocks.Block {
return bs
}

func (m *impl) Cancel(k *cid.Cid) {
func (m *impl) Cancel(k cid.Cid) {
delete(m.wantlist, k.KeyString())
m.addEntry(k, 0, true)
}

func (m *impl) AddEntry(k *cid.Cid, priority int) {
func (m *impl) AddEntry(k cid.Cid, priority int) {
m.addEntry(k, priority, false)
}

func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
k := c.KeyString()
e, exists := m.wantlist[k]
if exists {
Expand Down
6 changes: 3 additions & 3 deletions message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
u "github.com/ipfs/go-ipfs-util"
)

func mkFakeCid(s string) *cid.Cid {
func mkFakeCid(s string) cid.Cid {
return cid.NewCidV0(u.Hash([]byte(s)))
}

Expand Down Expand Up @@ -67,7 +67,7 @@ func TestAppendBlock(t *testing.T) {
}

func TestWantlist(t *testing.T) {
keystrs := []*cid.Cid{mkFakeCid("foo"), mkFakeCid("bar"), mkFakeCid("baz"), mkFakeCid("bat")}
keystrs := []cid.Cid{mkFakeCid("foo"), mkFakeCid("bar"), mkFakeCid("baz"), mkFakeCid("bat")}
m := New(true)
for _, s := range keystrs {
m.AddEntry(s, 1)
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestToAndFromNetMessage(t *testing.T) {
}
}

func wantlistContains(wantlist *pb.Message_Wantlist, c *cid.Cid) bool {
func wantlistContains(wantlist *pb.Message_Wantlist, c cid.Cid) bool {
for _, e := range wantlist.GetEntries() {
if bytes.Equal(e.GetBlock(), c.Bytes()) {
return true
Expand Down
4 changes: 2 additions & 2 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type Receiver interface {

type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID

// Provide provides the key to the network
Provide(context.Context, *cid.Cid) error
Provide(context.Context, cid.Cid) error
}
4 changes: 2 additions & 2 deletions network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
}

// FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID {

// Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts
Expand Down Expand Up @@ -174,7 +174,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int)
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error {
func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error {
return bsnet.routing.Provide(ctx, k, true)
}

Expand Down
6 changes: 3 additions & 3 deletions notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const bufferSize = 16

type PubSub interface {
Publish(block blocks.Block)
Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block
Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block
Shutdown()
}

Expand Down Expand Up @@ -61,7 +61,7 @@ func (ps *impl) Shutdown() {
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {

blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
Expand Down Expand Up @@ -121,7 +121,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.B
return blocksCh
}

func toStrings(keys []*cid.Cid) []string {
func toStrings(keys []cid.Cid) []string {
strs := make([]string, 0, len(keys))
for _, key := range keys {
strs = append(strs, key.KeyString())
Expand Down
4 changes: 2 additions & 2 deletions notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {

t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000)
ks := func() []*cid.Cid {
var keys []*cid.Cid
ks := func() []cid.Cid {
var keys []cid.Cid
for _, b := range bs {
keys = append(keys, b.Cid())
}
Expand Down
Loading