Go夜读IPFS预习资料

25次阅读

共计 13015 个字符,预计需要花费 33 分钟才能阅读完成。

IPFS 就其本身,没有用到多少新技术,大多数都是已有技术和思路,但并不能说就没有创新,类似比特币。

分布式哈希表

思路:KadDHT 的节点寻址与内容寻址同构,在 KAD 基于异或来计算逻辑距离的基础上,节点倾向于连接与自己距离更近的节点,存储与自己距离更近的内容 Key 值,并在此之上加入了延时等具体物理距离的考量 (Coral DHT)。引入 S -KadDHT 加入了一定的抗女巫攻击能力,因为生成一个公私钥需要计算难度值。

Git

IPFS 的基础数据结构与组织方式受 Git 影响很深,首先 Git 是一套基于内容寻址的文件系统,git 有 blob,tree,commit,tag 四种对象。blob 对象存储各种类型的文件,当你添加一个 blob 时,会通过 sha1 算法计算它的 hash,以前两位为文件夹,后面的为文件名。

Tree 则可以指向其他 tree 或者 blob 等,关系的链接是用 DAG 来做的,按理说也不可能有环,不可能修改 git 后发现改到原来的一个版本去了。

// 检验加入 blob
$echo "hello,world" > 1.txt
$git hash-object -w 1.txt                                              2d832d9044c698081e59c322d5a2a459da546469
$echo "hello,world." > 1.txt
$git hash-object -w 1.txt                                             ec03970d236871dd491c61683f3074d3038b4f6e
$find .git/objects -type f  // 查看存储的文件,可以看到 hash 直接当成了路径
.git/objects/2d/832d9044c698081e59c322d5a2a459da546469
.git/objects/ec/03970d236871dd491c61683f3074d3038b4f6e

#检验加入 tree
$git update-index --add --cacheinfo 100644 ec03970d236871dd491c61683f3074d3038b4f6e test
$git write-tree
b0de348e9cb3a01b47154a4331678787e29afdff
#加入一个新文件
$echo "new file">new.txt
$git hash-object -w new.txt
fa49b077972391ad58037050f2a75f74e3671e92
$git update-index --add --cacheinfo 100644 fa49b077972391ad58037050f2a75f74e3671e92 test1
$git write-tree
7b452990f69f7751ba805a291549c32dbfeb8716
#对比两棵树的内容
$git cat-file -p b0de348e9cb3a01b47154a4331678787e29afdff
100644 blob ec03970d236871dd491c61683f3074d3038b4f6e    test

#可以看到,两棵树,一个指向一个 hash,一个指向两个 hash,且有一个被两棵树指向,故而是图
#但因为无环,所以为 DAG 有向无环图。$git cat-file -p 7b452990f69f7751ba805a291549c32dbfeb8716
100644 blob ec03970d236871dd491c61683f3074d3038b4f6e    test
100644 blob fa49b077972391ad58037050f2a75f74e3671e92    test1

而 IPFS 也设计了相应的对象,blob、list、tree、commit,同时也用 DAG 来组织。

SFS 自验证文件系统

为了发布动态内容,需要一种映射机制,将某个 name 动态映射到一个 IPFS 哈希。在 HTTP 的世界里,PKI 需要一个可信机构颁发证书来维持,但因为 IPFS 的志向是去中心化,所以不设计可信机构,而采用 SFS 的方案,具体思路为,将公钥以及私钥签名等信息,放到这个动态映射中,即文件名本身包含公钥信息,既可以实现密钥分发。

IPFS 根据此想法设计了 IPNS。

Block

Block 是 IPFS 里一个非常基本的抽象,大部分结构都实现了接口,用于表示 IPFS 中,不可分的最小单位。

// ipfs/go-block-format/blocks.go
// Block provides abstraction for blocks implementations.
type Block interface {RawData() []byte
    Cid() cid.Cid
    String() string
    Loggable() map[string]interface{}}

// A BasicBlock is a singular block of data in ipfs. It implements the Block interface.
type BasicBlock struct {
    cid  cid.Cid
    data []byte}

// ipfs/go-ipfs/core/coreapi/block.go

// 基本的 put 块方法,简单的取一个 hash 当做块的寻址 ID(CID),然后就存起来了。func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.BlockPutOption) (coreiface.BlockStat, error) {settings, pref, err := caopts.BlockPutOptions(opts...)
    if err != nil {return nil, err}
    // 读取所有数据
    data, err := ioutil.ReadAll(src)
    if err != nil {return nil, err}
    // 求出数据 hash 当成块的 ID
    bcid, err := pref.Sum(data)
    if err != nil {return nil, err}
    // 包装成一个块
    b, err := blocks.NewBlockWithCid(data, bcid)
    if err != nil {return nil, err}

    if settings.Pin {defer api.blockstore.PinLock().Unlock()}
    // 存到磁盘上去
    err = api.blocks.AddBlock(b)
    if err != nil {return nil, err}

    if settings.Pin {api.pinning.PinWithMode(b.Cid(), pin.Recursive)
        if err := api.pinning.Flush(); err != nil {return nil, err}
    }

    return &BlockStat{path: path.IpldPath(b.Cid()), size: len(data)}, nil
}

Object

对象与 DAG 是紧密联合的,可以理解为是 DAG 中元素的另一种抽象

type Link struct {
    Name, Hash string
    Size       uint64
}

type Node struct {Links []Link
    Data  string
}
# 测试,data 为一个有两个子文件夹的文件夹
$ipfs add data -r
added QmXkiQUQ5xkC471K29MsL3tKHb9dsFgXernV9hbr4vDP3C data/1/testobject
added QmREfzcCsuHpzUvWUEzgiqHSNpiKnc18s4RfQXqvW669E8 data/2/1.txt
added QmZFYyMVQMXbis5spJkifRtoPoPz9pNo5NyiVSo9UhWTLa data/2/11.pdf
added QmcN88vyVJwfLRreojfT7qdVezShxF1JMCLQBxd3TwL3uz data/1
added QmV14YGVhYS1WyCvmmTnnFzJF4zC587LMCnufjox5r7zHd data/2
added QmdMwuSVU3NKQUy4YgxKWcAm9z6fWXYb6ampbWC442nBgE data
 8.99 MiB / 9.00 MiB [====================================================]  99.96%
 // 查看 links,可以看到指向两个子文件夹
$ipfs object links QmdMwuSVU3NKQUy4YgxKWcAm9z6fWXYb6ampbWC442nBgE
QmcN88vyVJwfLRreojfT7qdVezShxF1JMCLQBxd3TwL3uz 82      1
QmV14YGVhYS1WyCvmmTnnFzJF4zC587LMCnufjox5r7zHd 9432929 2

DAG

上面说过,ipfs 有 blob、list、tree、commit 等数据类型,通过 DAG 有向无环图链接。还是以上面 add 的文件夹为例。

$ipfs dag get QmdMwuSVU3NKQUy4YgxKWcAm9z6fWXYb6ampbWC442nBgE
{"data":"CAE=",
"links":
    [{"Cid":{"/":"QmcN88vyVJwfLRreojfT7qdVezShxF1JMCLQBxd3TwL3uz"},"Name":"1","Size":82},
    #链接的 hash
    {"Cid":{"/":"QmV14YGVhYS1WyCvmmTnnFzJF4zC587LMCnufjox5r7zHd"},"Name":"2","Size":9432929}
    #链接的另一个 hash
    ]
}
# 再查看其中一个 hash,可以看到指向一个文件 testobject
$ipfs dag get QmcN88vyVJwfLRreojfT7qdVezShxF1JMCLQBxd3TwL3uz
{"data":"CAE=",
"links":
[{"Cid":{"/":"QmXkiQUQ5xkC471K29MsL3tKHb9dsFgXernV9hbr4vDP3C"},"Name":"testobject","Size":26}]}

代码

// Link represents an IPFS Merkle DAG Link between Nodes.
type Link struct {
    // utf string name. should be unique per object
    Name string // utf8

    // cumulative size of target object
    Size uint64

    // multihash of the target object
    Cid cid.Cid
}

DAG 会递归进行,生成一个 node 后,会成为上一层节点的一个 link,直到结束。

ipfs 支持两种 dag,一种是 balanced dag,适合随机访问;另一个种是 trickledag,适合顺序访问

var nd ipld.Node
if adder.Trickle {nd, err = trickle.Layout(db)
} else {nd, err = balanced.Layout(db)
}

以 balanced 为例,内部会递归将数据分割,成为 DAG 上的节点, 下面的 fillNodeRec 函数是递归进行的,并且为了避免块太大,作为中间节点一次存的 hash 也是有限的。

// Each time a DAG of a certain `depth` is filled (because it
    // has reached its maximum capacity of `db.Maxlinks()` per node)
    // extend it by making it a sub-DAG of a bigger DAG with `depth+1`.
    for depth := 1; !db.Done(); depth++ {

        // Add the old `root` as a child of the `newRoot`.
        newRoot := db.NewFSNodeOverDag(ft.TFile)
        newRoot.AddChild(root, fileSize, db)

        // Fill the `newRoot` (that has the old `root` already as child)
        // and make it the current `root` for the next iteration (when
        // it will become "old").
        root, fileSize, err = fillNodeRec(db, newRoot, depth)
        if err != nil {return nil, err}
    }

IPNS

$ipfs add helloworld
added QmVtZPoeiqpREqkpTTNMzXkUt74SgQA4JYMG8zPjMVULby helloworld
12 B / ? [-----------------------------------------------------] 
$ipfs name publish QmVtZPoeiqpREqkpTTNMzXkUt74SgQA4JYMG8zPjMVULby
Published to QmdcFVhY4m3NyYfiMbosr3ZbW9EHCkcBk4xySt4uQozXb6: /ipfs/QmVtZPoeiqpREqkpTTNMzXkUt74SgQA4JYMG8zPjMVULby
#解析 IPNS
$ipfs name resolve QmdcFVhY4m3NyYfiMbosr3ZbW9EHCkcBk4xySt4uQozXb6
/ipfs/QmVtZPoeiqpREqkpTTNMzXkUt74SgQA4JYMG8zPjMVULby
#查看 IPNS 的内容
$ipfs cat /ipns/QmdcFVhY4m3NyYfiMbosr3ZbW9EHCkcBk4xySt4uQozXb6
hello,world
#修改文件内容重新发布
$echo "ipfs" >> helloworld
$ipfs add helloworld
added Qmb8sAnXHQYpuAwCTFmHvgz2P5AGJH4y5uVosSdKzcHSbN helloworld
17 B / 17 B [=====================================================] 100.00%
$ipfs name publish Qmb8sAnXHQYpuAwCTFmHvgz2P5AGJH4y5uVosSdKzcHSbN
Published to QmdcFVhY4m3NyYfiMbosr3ZbW9EHCkcBk4xySt4uQozXb6:
/ipfs/Qmb8sAnXHQYpuAwCTFmHvgz2P5AGJH4y5uVosSdKzcHSbN
#查看 IPNS 的内容
$ipfs cat /ipns/QmdcFVhY4m3NyYfiMbosr3ZbW9EHCkcBk4xySt4uQozXb6
hello,world
ipfs

// 其实就是节点 ID
$ipfs id -f="<id>\n"
QmdcFVhY4m3NyYfiMbosr3ZbW9EHCkcBk4xySt4uQozXb6

看一看代码实现,其实还是比较好理解的

// 基本用处就是签一个名
// Create creates a new IPNS entry and signs it with the given private key.
//
// This function does not embed the public key. If you want to do that, use
// `EmbedPublicKey`.
func Create(sk ic.PrivKey, val []byte, seq uint64, eol time.Time) (*pb.IpnsEntry, error) {entry := new(pb.IpnsEntry)

    entry.Value = val
    typ := pb.IpnsEntry_EOL
    entry.ValidityType = &typ
    entry.Sequence = &seq
    entry.Validity = []byte(u.FormatRFC3339(eol))

    // 签名
    sig, err := sk.Sign(ipnsEntryDataForSig(entry))
    if err != nil {return nil, err}
    entry.Signature = sig

    return entry, nil
}

// 验证 IPNS 项,验证签名
// Validates validates the given IPNS entry against the given public key.
func Validate(pk ic.PubKey, entry *pb.IpnsEntry) error {
    // Check the ipns record signature with the public key
    if ok, err := pk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()); err != nil || !ok {return ErrSignature}

    eol, err := GetEOL(entry)
    if err != nil {return err}
    if time.Now().After(eol) {return ErrExpiredRecord}
    return nil
}

// 嵌入公钥信息,形成自验证
// EmbedPublicKey embeds the given public key in the given ipns entry. While not
// strictly required, some nodes (e.g., DHT servers) may reject IPNS entries
// that don't embed their public keys as they may not be able to validate them
// efficiently.
func EmbedPublicKey(pk ic.PubKey, entry *pb.IpnsEntry) error {
    // Try extracting the public key from the ID. If we can, *don't* embed
    // it.
    id, err := peer.IDFromPublicKey(pk)
    if err != nil {return err}
    if _, err := id.ExtractPublicKey(); err != peer.ErrNoPublicKey {
        // Either a *real* error or nil.
        return err
    }

    // We failed to extract the public key from the peer ID, embed it in the
    // record.
    pkBytes, err := pk.Bytes()
    if err != nil {return err}
    entry.PubKey = pkBytes
    return nil
}

Kad 寻址

kad 的思路前面讲过了,下面看看核心代码。

// 计算距离的函数
func CommonPrefixLen(a, b ID) int {return ks.ZeroPrefixLen(u.XOR(a, b))
}

// 在 bucket 里寻找距离最近的节点信息
// NearestPeers returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
    // 计算异或距离
    cpl := CommonPrefixLen(id, rt.local)

    // It's assumed that this also protects the buckets.
    rt.tabLock.RLock()

    // Get bucket at cpl index or last bucket
    var bucket *Bucket
    // 确保距离不超过当前桶大小
    if cpl >= len(rt.Buckets) {cpl = len(rt.Buckets) - 1
    }
    // 取出桶
    bucket = rt.Buckets[cpl]

    pds := peerDistanceSorter{peers:  make([]peerDistance, 0, 3*rt.bucketsize),
        target: id,
    }
    // 将节点追加进去
    pds.appendPeersFromList(bucket.list)
    // 如果数量不够,从前后两个桶补充
    if pds.Len() < count {
        // In the case of an unusual split, one bucket may be short or empty.
        // if this happens, search both surrounding buckets for nearby peers
        if cpl > 0 {pds.appendPeersFromList(rt.Buckets[cpl-1].list)
        }
        if cpl < len(rt.Buckets)-1 {pds.appendPeersFromList(rt.Buckets[cpl+1].list)
        }
    }
    rt.tabLock.RUnlock()

    // 进行一次排序
    // Sort by distance to local peer
    pds.sort()

    if count < pds.Len() {pds.peers = pds.peers[:count]
    }

    out := make([]peer.ID, 0, pds.Len())
    for _, p := range pds.peers {out = append(out, p.p)
    }

    return out
}

// 更新节点信息
// Update adds or moves the given peer to the front of its respective bucket
func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) {peerID := ConvertPeerID(p)
    // 首先和本地 ID 异或计算距离
    cpl := CommonPrefixLen(peerID, rt.local)

    rt.tabLock.Lock()
    defer rt.tabLock.Unlock()
    bucketID := cpl
    if bucketID >= len(rt.Buckets) {bucketID = len(rt.Buckets) - 1
    }

    bucket := rt.Buckets[bucketID]
    if bucket.Has(p) {
        // If the peer is already in the table, move it to the front.
        // This signifies that it it "more active" and the less active nodes
        // Will as a result tend towards the back of the list
        bucket.MoveToFront(p)
        return "", nil
    }
    
    // 保证时延要求
    if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
        // Connection doesnt meet requirements, skip!
        return "", ErrPeerRejectedHighLatency
    }

    // We have enough space in the bucket (whether spawned or grouped).
    if bucket.Len() < rt.bucketsize {bucket.PushFront(p)
        rt.PeerAdded(p)
        return "", nil
    }

    if bucketID == len(rt.Buckets)-1 {// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
        rt.nextBucket()
        // the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
        bucketID = cpl
        if bucketID >= len(rt.Buckets) {bucketID = len(rt.Buckets) - 1
        }
        bucket = rt.Buckets[bucketID]
        if bucket.Len() >= rt.bucketsize {
            // if after all the unfolding, we're unable to find room for this peer, scrap it.
            return "", ErrPeerRejectedNoCapacity
        }
        bucket.PushFront(p)
        rt.PeerAdded(p)
        return "", nil
    }

    return "", ErrPeerRejectedNoCapacity
}

Bitswap

由于 IPFS 相当于一个全局的种子,自然会有一些策略,防止有人搭便车过于严重,于是就有了 bitswap,bitswap 会维护 ledger,记录临近节点的收发情况 (wantlist 与 havelist),来进行决策(bitswap 下的 engine)。

代码比较复杂,异步逻辑过多,粗略的讲,不贴代码了,到时候可以对着代码讲讲。

逻辑如下,当节点想获取一堆块的时候,首先在本地看看有没有,没有进入 bitswap 建立一个 session(会话),然后在 bitswap 的 pub/sub 机制中订阅块(会写进一个块的 channel)。然后将这些块的 cid 加入 wantlist,让 bitswap 的 worker 开始工作,期间会异步进行很多事,首先会定时用 p2p 层的 contentRouting 的 FindProviderAsync 寻找拥有块的节点,然后 bitswap 这里会不断去取 provider 的 ID,并向其发送请求,如果块返回了,检查一下 hash 值检验完整性,然后通过 pub/sub 的 channel 将块 publish,通过 channel 发布过去,然后上层的逻辑,即 channel 的另一端就可以返回结果了。

超越 IPFS- 区块链存储

IPFS 不能解决的问题,少部分节点用爱发电,大部分节点搭便车,没有经济激励,单凭 bitswap 机制,是不可能真正完整的。

P2P 存储在没有区块链的年代,只能用于文件分享,以及被某些视频公司内置用来 CDN(注意,视频网站占了你的上传带宽,但并没有向你支付费用,而你观看视频反而是需要费用的)。

于是,协议实验室提出了 Filecoin 项目,旨在让所有的参与方都有利可图,无论是存储空间还是带宽都可以盈利,用户也能以更低廉的价格获得更安全的服务。

区块链存储相对于其他项目,是最好落地的,因为存储内容和分发内容,在某种意义上是数字的,可以通过数学与密码学验证,而其他的区块链项目,只要和实体经济挂钩,链上链下如何交互是个很难的问题。

IPFS Filecoin
类别 去中心化文件系统(无激励) 去中心化存储网络
功能 基于内容寻址的分布式存储基础设施 IPFS 网络之上的激励层,提供一个云存储领域的自由交易市场
存储权限 对有所有权的 IPFS 节点具备存储权限 1. 除对有所有权的 IPFS 节点具备存储权限外 2. 还可以通过支付的方式,在其供应商的节点之上具备存储权限
读取权限 ALL(只要知道内容 cid) ALL(只要知道内容 cid)

两个市场

存储市场

本质上是一个去中心化的云存储,用户通过 Filecoin 的 Token,去租用存储矿工的存储空间,矿工持续提交存储证明,这里销售购买的是存储空间。

检索市场

可以理解为一个去中心化的 CDN 服务,用户通过 Token,让检索矿工检索数据返回,这里销售购买的是带宽、流量。

存储证明

如何证明矿工存储数据?

如何证明矿工存储了冗余数据?

如何持续地证明矿工存储了数据?

区块链采用何种共识?

基础存储证明(见 PPT)

1,本地存分片哈希

2,本地存 Merkle 树根,将文件分片及 merkle 树发给存储方。

3,利用同态 Hash 函数生成数据片的同态哈希发给存储方。

上面三个为什么不适用 Filecoin?

考虑维度:通信复杂度?计算复杂度?能否抗三种攻击(女巫攻击,生成攻击,外源攻击)?用户和存储方能否合谋,比如利用同态 hash,那存储方可以任意刷数据生成证明抢出块权了。

Filecoin 采用的,一存储方在存储空间有一个 sector(扇区)大小时,对其进行编码,zigzag 深度鲁棒图,一个 sector 内一个文件块的编码可能同时和好几个其他文件块相关,然后做一次 VDE,如此从左到右,从右到左编码多次,每次为一层,z 字形,故成为 zigzag,如果删除了其中一块,由于同时还需要很多其他块,导致生成攻击成本很高,其中用到了 VDE(可验证的延迟编码),最后证明用到了 zksnark 零知识证明。

连续的复制证明即可实现时空证明,所以时空证明与复制证明其实是一起实现的。

但到此,时空证明只有对存储空间的证明,没有设计出块规则,filecoin 采用了类似 VRF 的共识算法(最初白皮书为 POW)。

共识采用类 VRF 的秘密领导人选举,计算一个 input 的 hash,计算其小于某一个 target,但是不同的存储矿工需要的 target 不同,与其占全网占的存储算力成正比,及生成的存储证明占比越高,越可能爆块。所以前期,filecoin 需求不足时,矿工可以自己付费刷数据提高爆块率。

$$
H(<t||rand(t)>_{M_i})/2^L \leq p_i^t/ \sum_j p_j^t
$$

H- 哈希函数

rand(t):时间 t 时的随机数,可用 VDF 生成连续随机数序列。

$M_i$: 对其签名

$2^L$: 让最后的值在 0~100% 之间

另一边是存储算力与全网算力的比值,即占比值越大者机会越大,某种 POS。

代码抽象后,如下,

func Mine(minerKey PrivateKey) {
    for r := range rounds { // for each round
        bestTipset, tickets := ChainTipsMgr.GetBestTipsetAtRound(r - 1)

        ticket := GenerateTicket(minerKey, bestTipset)
        tickets.Append(ticket)

        // Generate an ElectionProof and check if we win
        // Note: even if we don't win, we want to make a ticket
        // in case we need to mine a null round
        win, proof := CheckIfWinnerAtRound(minerKey, r, bestTipset)
        if win {GenerateAndBroadcastBlock(bestTipset, tickets, proof)
        } else {
            // Even if we don't win, add our ticket to the tracker in
            // case we need to mine on it later.
            ChainTipsMgr.AddFailedTicket(bestTipset, tickets)
        }
    }
}

检查自己是不是被选中的。

const RandomnessLookback = 1 // Also referred to as "K" in many places

func CheckIfWinnerAtRound(key PrivateKey, n Integer, parentTipset Tipset) (bool, ElectionProof) {lbt := ChainTipsMgr.TicketFromRound(parentTipset, n-RandomnessLookback)

  eproof := ComputeElectionProof(lbt, key)

  minerPower := GetPower(miner)
  totalPower := state.GetTotalPower()

  if IsProofAWinner(eproof, minerPower, totalPower)
    return True, eproof
  else
    return False, None
}

问题:因为是秘密领导人选举,此共识为预期共识,有可能某一轮无人选中,有可能选中多人(会打包多个区块生成 tipset),目前并不很完美。

正文完
 0