Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
0x5459 committed Sep 5, 2023
1 parent d7c5033 commit 382457c
Show file tree
Hide file tree
Showing 19 changed files with 165 additions and 58 deletions.
2 changes: 1 addition & 1 deletion damocles-manager/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type SealerAPI interface {
AchieveUnsealSector(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error)
AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error)

StoreUri(ctx context.Context, storeName string, resource string) (string, error)
StoreUri(ctx context.Context, storeName, resource string, minerID uint64) (string, error)
}

type SealerCliAPI interface {
Expand Down
2 changes: 1 addition & 1 deletion damocles-manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/golang/mock v1.6.0
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230901024916-a92456f84db2
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230905053730-d7c5033eb6eb
github.com/ipfs-force-community/venus-cluster-assets v0.1.0
github.com/ipfs/boxo v0.10.1
github.com/ipfs/go-cid v0.4.1
Expand Down
4 changes: 2 additions & 2 deletions damocles-manager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,8 @@ github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lTo
github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230901024916-a92456f84db2 h1:iarakGdwdvuaGJv8cbV3NHv7RseaeuDD/PzPoeajy60=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230901024916-a92456f84db2/go.mod h1:me1u2cl7qdxBCZiVL0laDop8uBHDdUwlUNnQ7KkHF64=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230905053730-d7c5033eb6eb h1:GPWQEuzTnwDjaTeZxzM9sxsMFiHGA10jR3ZNNOoZOnA=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230905053730-d7c5033eb6eb/go.mod h1:EpGeK7b251iv7L5TnHl1PJGFH4KbliE03ctYt5thy6c=
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4 h1:iu/3irYevdNpdc0B/gRi1vuS3+lRn+6Ro9G0FeBiAfE=
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/ipfs-force-community/venus-cluster-assets v0.1.0 h1:K/0+OV9Jm7HjSa7O9MAtgfLDIudQYZUTymhJsp8rGXg=
Expand Down
6 changes: 5 additions & 1 deletion damocles-manager/modules/impl/mock/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
chainAPI "github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/ver"
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
)

var _ core.SealerAPI = (*Sealer)(nil)
Expand Down Expand Up @@ -320,7 +321,10 @@ func (s *Sealer) Version(context.Context) (string, error) {
return ver.VersionStr(), nil
}

func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) {
func (s *Sealer) StoreUri(ctx context.Context, storeName, resource string, minerID uint64) (string, error) {
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
ID: minerID,
})
store, err := s.persistedStoreManager.GetInstance(ctx, storeName)
if err != nil {
return "", err
Expand Down
4 changes: 4 additions & 0 deletions damocles-manager/modules/impl/sectors/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/modules"
chainAPI "github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
"github.com/ipfs/go-cid"
)

Expand Down Expand Up @@ -45,6 +46,9 @@ type Proving struct {
}

func (p *Proving) SingleProvable(ctx context.Context, postProofType abi.RegisteredPoStProof, sref core.SectorRef, upgrade bool, locator core.SectorLocator, strict, stateCheck bool) error {
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
ID: uint64(sref.ID.Miner),
})
ssize, err := sref.ProofType.SectorSize()
if err != nil {
return fmt.Errorf("get sector size: %w", err)
Expand Down
11 changes: 7 additions & 4 deletions damocles-manager/modules/impl/sectors/snapup_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/messager"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
)

func NewSnapUpCommitter(
Expand Down Expand Up @@ -591,8 +592,10 @@ func (h *snapupCommitHandler) cleanupForSector() error {
ID: h.state.ID,
ProofType: h.state.SectorType,
}

privateInfo, err := h.committer.tracker.SinglePrivateInfo(h.committer.ctx, sref, false, nil)
ctx := storeMiner.NewContext(h.committer.ctx, &storeMiner.MinerMeta{
ID: uint64(sref.ID.Miner),
})
privateInfo, err := h.committer.tracker.SinglePrivateInfo(ctx, sref, false, nil)
if err != nil {
return fmt.Errorf("get private info from tracker: %w", err)
}
Expand All @@ -613,7 +616,7 @@ func (h *snapupCommitHandler) cleanupForSector() error {

for ti := range cleanupTargets {
storeInstance := cleanupTargets[ti].storeInstance
store, err := h.committer.indexer.StoreMgr().GetInstance(h.committer.ctx, storeInstance)
store, err := h.committer.indexer.StoreMgr().GetInstance(ctx, storeInstance)
if err != nil {
return fmt.Errorf("get store instance %s: %w", storeInstance, err)
}
Expand All @@ -623,7 +626,7 @@ func (h *snapupCommitHandler) cleanupForSector() error {
for fi := range fileURIs {
uri := fileURIs[fi]
errwg.Go(func() error {
delErr := store.Del(h.committer.ctx, uri)
delErr := store.Del(ctx, uri)
if delErr == nil {
log.Debugf("CC data cleaned: %s, store: %s", uri, storeInstance)
return nil
Expand Down
4 changes: 4 additions & 0 deletions damocles-manager/modules/impl/sectors/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/core"
"github.com/ipfs-force-community/damocles/damocles-manager/modules/util"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
)

var _ core.SectorTracker = (*Tracker)(nil)
Expand Down Expand Up @@ -41,6 +42,9 @@ func (t *Tracker) SinglePubToPrivateInfo(ctx context.Context, mid abi.ActorID, s
}

func (t *Tracker) getPrivateInfo(ctx context.Context, sref core.SectorRef, upgrade bool, locator core.SectorLocator) (*sectorStoreInstances, core.PrivateSectorInfo, error) {
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
ID: uint64(sref.ID.Miner),
})
objins, err := t.getObjInstanceForSector(ctx, sref.ID, locator, upgrade)
if err != nil {
return nil, core.PrivateSectorInfo{}, fmt.Errorf("get location for %s: %w", util.FormatSectorID(sref.ID), err)
Expand Down
6 changes: 5 additions & 1 deletion damocles-manager/modules/sealer/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/logging"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/piecestore"
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
)

var (
Expand Down Expand Up @@ -834,7 +835,10 @@ func (s *Sealer) AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceC
return s.unseal.AcquireDest(ctx, sid, pieceCid)
}

func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) {
func (s *Sealer) StoreUri(ctx context.Context, storeName, resource string, minerID uint64) (string, error) {
ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
ID: minerID,
})
store, err := s.sectorIdxer.StoreMgr().GetInstance(ctx, storeName)
if err != nil {
return "", err
Expand Down
5 changes: 5 additions & 0 deletions damocles-manager/modules/sealer/sealer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/kvstore"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/ver"
storeMiner "github.com/ipfs-force-community/damocles/manager-plugin/objstore/miner"
)

func (s *Sealer) ListSectors(ctx context.Context, ws core.SectorWorkerState, job core.SectorWorkerJob) ([]*core.SectorState, error) {
Expand Down Expand Up @@ -225,6 +226,10 @@ func (s *Sealer) RemoveSector(ctx context.Context, sid abi.SectorID) error {
}
}

ctx = storeMiner.NewContext(ctx, &storeMiner.MinerMeta{
ID: uint64(sid.Miner),
})

dest := s.sectorIdxer.Normal()
if state.Upgraded {
dest = s.sectorIdxer.Upgrade()
Expand Down
21 changes: 16 additions & 5 deletions damocles-worker/src/infra/objstore/filestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,26 @@ impl ObjectStore for FileStore {
self.instance.clone()
}

fn uri(&self, resource_name: &str) -> ObjResult<String> {
let uri = call_rpc! {
self.rpc=>store_uri(
fn uris(
&self,
miner_id: u64,
resources: Vec<String>,
) -> ObjResult<HashMap<String, String>> {
let uris = call_rpc! {
self.rpc=>store_uris(
self.instance(),
resource_name.to_string(),
miner_id,
resources.to_vec(),
)
}
.map_err(|e| ObjectStoreError::Other(e.1))?;
Ok(self.local_path.join(uri).display().to_string())

Ok(uris
.into_iter()
.map(|(resource, uri)| {
(resource, self.local_path.join(uri).display().to_string())
})
.collect())
}

fn readonly(&self) -> bool {
Expand Down
27 changes: 24 additions & 3 deletions damocles-worker/src/infra/objstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,31 @@ pub trait ObjectStore: Send + Sync {
/// for fs-like stores, this should return an abs path.
/// for other stores, this may return a url, or path part of a url.
///
/// the resource value looks like this:
/// - "cache/sc-02-data-tree-r-last.dat"
fn uri(&self, resource: &str) -> ObjResult<String>;
/// the resource value looks like these:
/// - "cache/s-t010001-101/sc-02-data-tree-r-last-0.dat"
/// - "update-cache/s-t010001-101/sc-02-data-tree-r-last-0.dat"
/// - "sealed/s-t010001-101"
/// - "update/s-t010001-101"
fn uris(
&self,
miner_id: u64,
resources: Vec<String>,
) -> ObjResult<HashMap<String, String>>;

/// if this instance is read-only
fn readonly(&self) -> bool;
}

/// Extension methods of Objstore
pub trait ObjectStoreExt {
/// Get a single uri
fn uri(&self, miner_id: u64, resource: &str) -> ObjResult<String>;
}

impl<T: ObjectStore + ?Sized> ObjectStoreExt for T {
fn uri(&self, miner_id: u64, resource: &str) -> ObjResult<String> {
let mut res = self.uris(miner_id, vec![resource.to_string()])?;
res.remove(resource)
.ok_or_else(|| ObjectStoreError::IO(io::ErrorKind::NotFound.into()))
}
}
9 changes: 5 additions & 4 deletions damocles-worker/src/rpc/sealer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,11 @@ pub trait Sealer {
error_reason: String,
) -> Result<()>;

#[rpc(name = "Venus.StoreUri")]
fn store_uri(
#[rpc(name = "Venus.StoreUris")]
fn store_uris(
&self,
store_name: String,
resources: String,
) -> Result<String>;
miner_id: u64,
resources: Vec<String>,
) -> Result<HashMap<String, String>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
},
},
types::SIZE_32G,
SealProof,
SealProof, objstore::ObjectStoreExt,
};

use super::task::Task;
Expand Down Expand Up @@ -582,6 +582,8 @@ pub(crate) fn persist_sector_files(
.map(|fname| cache_dir.join(fname)),
);

let miner_id = sector_id.miner;

let transfer_routes = wanted
.into_iter()
.map(|p| {
Expand All @@ -597,7 +599,7 @@ pub(crate) fn persist_sector_files(
dest: TransferItem::Store {
store: ins_name.clone(),
uri: persist_store
.uri(rel_path)
.uri(miner_id, rel_path)
.with_context(|| format!("get uri for {:?}", rel_path))
.perm()?,
resource_name: rel_path.to_string(),
Expand Down
8 changes: 5 additions & 3 deletions damocles-worker/src/sealing/sealing_thread/planner/snapup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::{
},
plan, PlannerTrait, PLANNER_NAME_SNAPUP,
};
use crate::infra::objstore::ObjectStoreExt;
use crate::logging::{debug, warn};
use crate::rpc::sealer::{
AcquireDealsSpec, AllocateSectorSpec, AllocateSnapUpSpec,
Expand Down Expand Up @@ -221,8 +222,9 @@ impl<'t> SnapUp<'t> {
})
.crit()?;

let cache_dir = self.task.cache_dir(sector_id);
let miner_id = sector_id.miner;

let cache_dir = self.task.cache_dir(sector_id);
let cached_file_routes = cached_filenames_for_sector(proof_type.into())
.into_iter()
.map(|fname| {
Expand All @@ -242,7 +244,7 @@ impl<'t> SnapUp<'t> {
src: TransferItem::Store {
store: access_instance.clone(),
uri: access_store
.uri(cached_rel)
.uri(miner_id, cached_rel)
.with_context(|| {
format!(
"get uri for cache dir {:?} in {}",
Expand All @@ -265,7 +267,7 @@ impl<'t> SnapUp<'t> {
src: TransferItem::Store {
store: access_instance.clone(),
uri: access_store
.uri(sealed_rel)
.uri(miner_id, sealed_rel)
.with_context(|| {
format!(
"get uri for sealed file {:?} in {}",
Expand Down
9 changes: 5 additions & 4 deletions damocles-worker/src/sealing/sealing_thread/planner/unseal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
common::{event::Event, sector::State, task::Task},
plan, PlannerTrait, PLANNER_NAME_UNSEAL,
};
use crate::logging::warn;
use crate::{logging::warn, objstore::ObjectStoreExt};
use crate::rpc::sealer::AllocateSectorSpec;
use crate::sealing::failure::*;
use anyhow::{anyhow, Context, Result};
Expand Down Expand Up @@ -178,8 +178,9 @@ impl<'t> Unseal<'t> {
})
.crit()?;

let miner_id = sector_id.miner;
let sealed_path = instance
.uri(sealed_rel)
.uri(miner_id, sealed_rel)
.with_context(|| {
format!(
"get uri for sealed file {:?} in {}",
Expand All @@ -188,7 +189,7 @@ impl<'t> Unseal<'t> {
})
.perm()?;
let cache_path = instance
.uri(cache_rel)
.uri(miner_id, cache_rel)
.with_context(|| {
format!(
"get uri for cache file {:?} in {}",
Expand Down Expand Up @@ -416,7 +417,7 @@ impl<'t> Unseal<'t> {
),
dest: TransferItem::Store {
store: ins_name.clone(),
uri: access_store.uri(des_path).perm()?,
uri: access_store.uri(sector_id.miner, des_path).perm()?,
resource_name: des_path.to_string(),
},
opt: None,
Expand Down
Loading

0 comments on commit 382457c

Please sign in to comment.