initial commit
This commit is contained in:
commit
5bf1599611
|
@ -0,0 +1,4 @@
|
|||
fls-*
|
||||
*.exe
|
||||
meta.json
|
||||
shard.*
|
|
@ -0,0 +1,10 @@
|
|||
.PHONY: all build test
|
||||
|
||||
all: build
|
||||
|
||||
build:
|
||||
GOOS=windows GOARCH=amd64 go build -o fls-windows-amd64.exe .
|
||||
GOOS=linux GOARCH=amd64 go build -o fls-linux-amd64 .
|
||||
|
||||
test:
|
||||
go test git.keganmyers.com/terribleplan/file-store/...
|
|
@ -0,0 +1,36 @@
|
|||
# FiLeStore
|
||||
|
||||
fls is a tool for easily, efficiently, and reliably storing your files across a pool of multiple disks, servers, racks, zones, regions, or even datacenters.
|
||||
|
||||
# What is the state of the project?
|
||||
|
||||
This project is very early in its development. It is no more than an experiment at this point. It lacks many features to make it useful, and many more features that would make it "good".
|
||||
|
||||
Do not use it to store any data you care about.
|
||||
|
||||
## TODO
|
||||
|
||||
- Chunk file validation
|
||||
- Chunk file repair/rebuilding
|
||||
- Input file reconstruction (with data validation)
|
||||
- Input file reconstruction (with missing chunk files/shards, without rebuilding)
|
||||
- Networking features
|
||||
- Chunk storage
|
||||
- Tracking health of stored chunks
|
||||
- Rebuilding lost chunks
|
||||
- Balancing of chunks
|
||||
- Filesystem features
|
||||
- FUSE mount of network filesystem
|
||||
|
||||
## DONE
|
||||
|
||||
- Chunk file generation (data + parity)
|
||||
- Input file reconstruction (requires all data chunks, does not validate reconstructed data)
|
||||
|
||||
# How does it work?
|
||||
|
||||
Files are striped (with a configurable stripe width, 10MiB by default) across a configurable number of data chunks (10 by default) and parity chunks (4 by default) are generated with Reed-Solomon erasure encoding. Chunks can be stored anywhere you can put a file. If the shards are distributed on enough disks/servers/whatever it is possible to recover from the loss of up to the number of parity chunks (by default you can lose any of up to 4 data or parity chunk files while maintaining data availability).
|
||||
|
||||
# Why?
|
||||
|
||||
For fun. To solve a specific problem I have with existing options for distributed replicated file systems. The primary goal of this project is reliable file storage. Some are overly complex. Some are difficult to administer. Some scale poorly. Some don't have adequate data integrity features. Some require full file replication. Hopefully all of these shortcomings and more will be addressed for this specific problem space.
|
|
@ -0,0 +1,62 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
sf "git.keganmyers.com/terribleplan/file-store/pkg/storefile"
|
||||
)
|
||||
|
||||
var (
|
||||
ee_Size int32
|
||||
ee_Shards uint16
|
||||
ee_Parity uint16
|
||||
ee_Name string
|
||||
ee_Overwrite bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
root.AddCommand(ee)
|
||||
|
||||
ee_encode.Flags().Int32Var(&ee_Size, "size", 10*1024*1024, "Size (in bytes) of each shard (default: 10485760, 10 MiB)")
|
||||
ee_encode.Flags().Uint16Var(&ee_Shards, "shards", 10, "Number of shards to generate for file (default: 10)")
|
||||
ee_encode.Flags().Uint16Var(&ee_Parity, "parity", 4, "Number of parity records to generate for file (default: 4)")
|
||||
ee_encode.Flags().StringVar(&ee_Name, "name", "", "Name to store with the file (default: file name, without path)")
|
||||
ee.AddCommand(ee_encode)
|
||||
|
||||
ee_decode.Flags().StringVar(&ee_Name, "output", "", "Path to store with the file (default: path in meta.json, relative to cwd)")
|
||||
ee_decode.Flags().BoolVar(&ee_Overwrite, "overwrite", false, "Whether to overwrite any existing output file (default: false)")
|
||||
ee.AddCommand(ee_decode)
|
||||
}
|
||||
|
||||
var ee = &cobra.Command{
|
||||
Use: "ee",
|
||||
Short: "Do things with erasure encoding",
|
||||
}
|
||||
|
||||
var ee_encode = &cobra.Command{
|
||||
Use: "encode [file] [output-folder]",
|
||||
Short: "Erasure encode a file",
|
||||
Long: `Erasure encode a file.
|
||||
|
||||
--shards and --parity controls output size and redundancy.
|
||||
Output size is file_size * (shards + parity / shards).
|
||||
Up to 'parity' shards can be lost while maintaining the ability to read/recover the file.`,
|
||||
Args: cobra.ExactArgs(2),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
if err := sf.StoreFile(args[0], args[1], ee_Size, ee_Shards, ee_Parity, ee_Name); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
var ee_decode = &cobra.Command{
|
||||
Use: "decode [encoded-folder]",
|
||||
Short: "Erasure decode a file",
|
||||
Long: `Erasure decode a file.`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
if err := sf.ReadFile(args[0], ee_Name, ee_Overwrite); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
},
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var root = &cobra.Command{
|
||||
Use: "fls",
|
||||
Short: "FiLe Store stores files efficiently across many servers",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
// Do Stuff Here
|
||||
},
|
||||
}
|
||||
|
||||
func Execute() {
|
||||
if err := root.Execute(); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
// todo: anything to init?
|
||||
// cobra.OnInitialize(initConfig)
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
module git.keganmyers.com/terribleplan/file-store
|
||||
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/klauspost/reedsolomon v1.10.0
|
||||
github.com/spf13/cobra v1.5.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.0.14 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.15 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
)
|
|
@ -0,0 +1,16 @@
|
|||
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/klauspost/cpuid/v2 v2.0.14 h1:QRqdp6bb9M9S5yyKeYteXKuoKE4p0tGlra81fKOpWH8=
|
||||
github.com/klauspost/cpuid/v2 v2.0.14/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
|
||||
github.com/klauspost/reedsolomon v1.10.0 h1:MonMtg979rxSHjwtsla5dZLhreS0Lu42AyQ20bhjIGg=
|
||||
github.com/klauspost/reedsolomon v1.10.0/go.mod h1:qHMIzMkuZUWqIh8mS/GruPdo3u0qwX2jk/LH440ON7Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI=
|
||||
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU=
|
||||
github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
|
@ -0,0 +1,9 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"git.keganmyers.com/terribleplan/file-store/cmd"
|
||||
)
|
||||
|
||||
func main() {
|
||||
cmd.Execute()
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
type ReadPlanner func(meta *EEMeta) []ChunkShardMeta
|
||||
type ReadHandler func(data []byte, plan ChunkShardMeta, readNum int) error
|
||||
|
||||
func decodeFn(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta, getPlan ReadPlanner, handleRead ReadHandler) error {
|
||||
raw := []byte{}
|
||||
rawLen := int32(0)
|
||||
fullPlan := getPlan(meta)
|
||||
|
||||
// we only need to seek once as the rest of the reads should be linear
|
||||
for _, plan := range fullPlan[0:min(uint64(meta.Params.Shards), uint64(len(fullPlan)))] {
|
||||
if _, err := inputs[plan.Chunk].Seek(int64(plan.ChunkOffset), io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for i, plan := range fullPlan {
|
||||
if rawLen != plan.Size {
|
||||
raw = make([]byte, plan.Size)
|
||||
rawLen = plan.Size
|
||||
}
|
||||
|
||||
if _, err := io.ReadFull(inputs[plan.Chunk], raw); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := handleRead(raw, plan, i); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Decode(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) error {
|
||||
return decodeFn(inputs, file, meta, func(meta *EEMeta) []ChunkShardMeta {
|
||||
return meta.Params.Plan(0, meta.Params.Size)
|
||||
}, func(data []byte, _ ChunkShardMeta, _ int) error {
|
||||
if _, err := file.Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func DecodeAndValidate(inputs []io.ReadSeeker, file io.Writer, meta *EEMeta) error {
|
||||
size := uint64(meta.Params.Size)
|
||||
shards := uint64(meta.Params.Shards)
|
||||
|
||||
// get set up to read meta including the padding
|
||||
validateParams := *meta
|
||||
if size%shards > 0 {
|
||||
validateParams.Size = (size / shards) * (shards + 1)
|
||||
}
|
||||
|
||||
return decodeFn(inputs, file, meta, func(_ *EEMeta) []ChunkShardMeta {
|
||||
return validateParams.Plan(0, validateParams.Size)
|
||||
}, func(data []byte, read ChunkShardMeta, i int) error {
|
||||
actual := sha256sum(data)
|
||||
if !bytes.Equal(actual, meta.ShardHashes[i]) {
|
||||
return fmt.Errorf("shard hash mismatch")
|
||||
}
|
||||
dataLen := uint64(len(data))
|
||||
writeData := data
|
||||
if read.GlobalOffset > size {
|
||||
writeData = nil
|
||||
} else if read.GlobalOffset+dataLen > size {
|
||||
writeData = data[0 : read.GlobalOffset-size]
|
||||
}
|
||||
if writeData != nil {
|
||||
if _, err := file.Write(writeData); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
raw := make([]byte, meta.Params.Stride)
|
||||
rawLen := int32(len(raw))
|
||||
for _, plan := range meta.Params.Plan(0, meta.Params.Size) {
|
||||
if rawLen != plan.Size {
|
||||
raw = make([]byte, plan.Size)
|
||||
rawLen = plan.Size
|
||||
}
|
||||
|
||||
// We can be particularly lazy and ignore Offset since we are reading the full file from the chunks
|
||||
if _, err := io.ReadFull(inputs[plan.Chunk], raw); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := file.Write(raw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
// "fmt"
|
||||
"bytes"
|
||||
"io"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var simpleDecodeMeta = &EEMeta{
|
||||
Params: Params{
|
||||
Size: 0,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
},
|
||||
}
|
||||
|
||||
func TestDecode_EvenStride_Full(t *testing.T) {
|
||||
expected := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11}
|
||||
simpleDecodeMeta.Params.Size = uint64(len(expected))
|
||||
|
||||
inputs := []io.ReadSeeker{
|
||||
bytes.NewReader([]byte{0x00, 0x01, 0x06, 0x07, 0x0c, 0x0d}),
|
||||
bytes.NewReader([]byte{0x02, 0x03, 0x08, 0x09, 0x0e, 0x0f}),
|
||||
bytes.NewReader([]byte{0x04, 0x05, 0x0a, 0x0b, 0x10, 0x11}),
|
||||
}
|
||||
|
||||
output := &bytes.Buffer{}
|
||||
if err := Decode(inputs, output, simpleDecodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := output.Bytes()
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecode_EvenStride_Short1(t *testing.T) {
|
||||
expected := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}
|
||||
simpleDecodeMeta.Params.Size = uint64(len(expected))
|
||||
|
||||
inputs := []io.ReadSeeker{
|
||||
bytes.NewReader([]byte{0x00, 0x01, 0x06, 0x07, 0x0c, 0x0d}),
|
||||
bytes.NewReader([]byte{0x02, 0x03, 0x08, 0x09, 0x0e, 0x0f}),
|
||||
bytes.NewReader([]byte{0x04, 0x05, 0x0a, 0x0b, 0x10, 0x00}),
|
||||
}
|
||||
|
||||
output := &bytes.Buffer{}
|
||||
if err := Decode(inputs, output, simpleDecodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := output.Bytes()
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecode_EvenStride_Short2(t *testing.T) {
|
||||
expected := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
|
||||
simpleDecodeMeta.Params.Size = uint64(len(expected))
|
||||
|
||||
inputs := []io.ReadSeeker{
|
||||
bytes.NewReader([]byte{0x00, 0x01, 0x06, 0x07, 0x0c, 0x0d}),
|
||||
bytes.NewReader([]byte{0x02, 0x03, 0x08, 0x09, 0x0e, 0x0f}),
|
||||
bytes.NewReader([]byte{0x04, 0x05, 0x0a, 0x0b, 0x00, 0x00}),
|
||||
}
|
||||
|
||||
output := &bytes.Buffer{}
|
||||
if err := Decode(inputs, output, simpleDecodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := output.Bytes()
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecode_OddStride_Full(t *testing.T) {
|
||||
expected := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e}
|
||||
simpleDecodeMeta.Params.Size = uint64(len(expected))
|
||||
|
||||
inputs := []io.ReadSeeker{
|
||||
bytes.NewReader([]byte{0x00, 0x01, 0x06, 0x07, 0x0c}),
|
||||
bytes.NewReader([]byte{0x02, 0x03, 0x08, 0x09, 0x0d}),
|
||||
bytes.NewReader([]byte{0x04, 0x05, 0x0a, 0x0b, 0x0e}),
|
||||
}
|
||||
|
||||
output := &bytes.Buffer{}
|
||||
if err := Decode(inputs, output, simpleDecodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := output.Bytes()
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecode_OddStride_Short1(t *testing.T) {
|
||||
expected := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d}
|
||||
simpleDecodeMeta.Params.Size = uint64(len(expected))
|
||||
|
||||
inputs := []io.ReadSeeker{
|
||||
bytes.NewReader([]byte{0x00, 0x01, 0x06, 0x07, 0x0c}),
|
||||
bytes.NewReader([]byte{0x02, 0x03, 0x08, 0x09, 0x0d}),
|
||||
bytes.NewReader([]byte{0x04, 0x05, 0x0a, 0x0b, 0x00}),
|
||||
}
|
||||
|
||||
output := &bytes.Buffer{}
|
||||
if err := Decode(inputs, output, simpleDecodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := output.Bytes()
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecode_OddStride_Short2(t *testing.T) {
|
||||
expected := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c}
|
||||
simpleDecodeMeta.Params.Size = uint64(len(expected))
|
||||
|
||||
inputs := []io.ReadSeeker{
|
||||
bytes.NewReader([]byte{0x00, 0x01, 0x06, 0x07, 0x0c}),
|
||||
bytes.NewReader([]byte{0x02, 0x03, 0x08, 0x09, 0x00}),
|
||||
bytes.NewReader([]byte{0x04, 0x05, 0x0a, 0x0b, 0x00}),
|
||||
}
|
||||
|
||||
output := &bytes.Buffer{}
|
||||
if err := Decode(inputs, output, simpleDecodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := output.Bytes()
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/klauspost/reedsolomon"
|
||||
)
|
||||
|
||||
func EncodeFile(file *os.File, oututs []io.Writer, stride int32, shards, parity uint16) (*EEMeta, error) {
|
||||
stats, err := file.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
size := uint64(stats.Size())
|
||||
|
||||
meta := &EEMeta{
|
||||
Params: Params{
|
||||
Size: size,
|
||||
Stride: stride,
|
||||
Shards: shards,
|
||||
Parity: parity,
|
||||
},
|
||||
}
|
||||
|
||||
if err := Encode(file, oututs, meta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
func Encode(file io.Reader, outputs []io.Writer, meta *EEMeta) error {
|
||||
// int(uint16) + int(uint16) != int; should be safe
|
||||
if int(meta.Params.Shards)+int(meta.Params.Parity) != len(outputs) {
|
||||
return fmt.Errorf("expected the number of shards+parity to equal the number of output files provided")
|
||||
}
|
||||
|
||||
// int(uint16), int(uint16), int(int32)
|
||||
enc, err := reedsolomon.New(int(meta.Params.Shards), int(meta.Params.Parity), reedsolomon.WithAutoGoroutines(int(meta.Params.Stride)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
shards := uint64(meta.Params.Shards)
|
||||
parity := uint64(meta.Params.Parity)
|
||||
|
||||
outputChunkCount := shards + parity
|
||||
|
||||
lastShardChunk := shards - 1
|
||||
endShards := shards + parity
|
||||
|
||||
data := make([][]byte, outputChunkCount)
|
||||
data[0] = []byte{}
|
||||
written := false // track whether the current stripe has been written
|
||||
for shard, csm := range meta.Params.Plan(0, meta.Params.Size) {
|
||||
chunk := uint64(shard) % shards
|
||||
written = false
|
||||
// prepare data slices, shard size only meaningfuly changes at stripe boundary
|
||||
if chunk == 0 {
|
||||
// if int32(len(data[0])) != csm.Size {
|
||||
for i := uint64(0); i < outputChunkCount; i++ {
|
||||
data[i] = make([]byte, csm.Size)
|
||||
}
|
||||
// }
|
||||
}
|
||||
// read the individual shard
|
||||
if _, err := io.ReadFull(file, data[csm.Chunk][0:csm.Size]); err != nil {
|
||||
return err
|
||||
}
|
||||
meta.ShardHashes = append(meta.ShardHashes, sha256sum(data[csm.Chunk]))
|
||||
|
||||
// if we are on the last chunk calculate the parity and write things out
|
||||
if chunk == lastShardChunk {
|
||||
if err := writeChunks(data, outputs, enc, meta, shards, endShards); err != nil {
|
||||
return err
|
||||
}
|
||||
written = true
|
||||
}
|
||||
}
|
||||
|
||||
if !written {
|
||||
if err := writeChunks(data, outputs, enc, meta, shards, endShards); err != nil {
|
||||
return err
|
||||
}
|
||||
written = true
|
||||
}
|
||||
|
||||
meta.ShardMerkle = merkleSha256(meta.ShardHashes)
|
||||
meta.ParityMerkle = merkleSha256(meta.ParityHashes)
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeChunks(data [][]byte, files []io.Writer, enc reedsolomon.Encoder, meta *EEMeta, shards, endShards uint64) error {
|
||||
if err := enc.Encode(data); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := shards; i < endShards; i++ {
|
||||
meta.ParityHashes = append(meta.ParityHashes, sha256sum(data[i]))
|
||||
}
|
||||
for i := 0; i < len(data); i++ {
|
||||
if _, err := files[i].Write(data[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,293 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
// "fmt"
|
||||
"bytes"
|
||||
"io"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var simpleEncodeMeta = &EEMeta{
|
||||
Params: Params{
|
||||
Size: 0,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
},
|
||||
}
|
||||
|
||||
func TestEncode_EvenStride_Full(t *testing.T) {
|
||||
file := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11}
|
||||
simpleEncodeMeta.Params.Size = uint64(len(file))
|
||||
|
||||
reader := bytes.NewReader(file)
|
||||
|
||||
outputs := []io.Writer{&bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}}
|
||||
|
||||
if err := Encode(reader, outputs, simpleEncodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := outputs[0].(*bytes.Buffer).Bytes()
|
||||
expected := []byte{0x00, 0x01, 0x06, 0x07, 0x0c, 0x0d}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[0] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[1].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x02, 0x03, 0x08, 0x09, 0x0e, 0x0f}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[1] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[2].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x04, 0x05, 0x0a, 0x0b, 0x10, 0x11}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[2] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
unexpected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
|
||||
actual = outputs[3].(*bytes.Buffer).Bytes()
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[3] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
|
||||
actual = outputs[4].(*bytes.Buffer).Bytes()
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[4] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode_EvenStride_Short1(t *testing.T) {
|
||||
file := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}
|
||||
simpleEncodeMeta.Params.Size = uint64(len(file))
|
||||
|
||||
reader := bytes.NewReader(file)
|
||||
|
||||
outputs := []io.Writer{&bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}}
|
||||
|
||||
if err := Encode(reader, outputs, simpleEncodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := outputs[0].(*bytes.Buffer).Bytes()
|
||||
expected := []byte{0x00, 0x01, 0x06, 0x07, 0x0c, 0x0d}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[0] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[1].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x02, 0x03, 0x08, 0x09, 0x0e, 0x0f}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[1] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[2].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x04, 0x05, 0x0a, 0x0b, 0x10, 0x00}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[2] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
unexpected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
|
||||
actual = outputs[3].(*bytes.Buffer).Bytes()
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[3] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
|
||||
actual = outputs[4].(*bytes.Buffer).Bytes()
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[4] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode_EvenStride_Short2(t *testing.T) {
|
||||
file := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
|
||||
simpleEncodeMeta.Params.Size = uint64(len(file))
|
||||
|
||||
reader := bytes.NewReader(file)
|
||||
|
||||
outputs := []io.Writer{&bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}}
|
||||
|
||||
if err := Encode(reader, outputs, simpleEncodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := outputs[0].(*bytes.Buffer).Bytes()
|
||||
expected := []byte{0x00, 0x01, 0x06, 0x07, 0x0c, 0x0d}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[0] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[1].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x02, 0x03, 0x08, 0x09, 0x0e, 0x0f}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[1] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[2].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x04, 0x05, 0x0a, 0x0b, 0x00, 0x00}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[2] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
unexpected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
|
||||
actual = outputs[3].(*bytes.Buffer).Bytes()
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[3] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
|
||||
actual = outputs[4].(*bytes.Buffer).Bytes()
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[4] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode_OddStride_Full(t *testing.T) {
|
||||
file := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e}
|
||||
simpleEncodeMeta.Params.Size = uint64(len(file))
|
||||
|
||||
reader := bytes.NewReader(file)
|
||||
|
||||
outputs := []io.Writer{&bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}}
|
||||
|
||||
if err := Encode(reader, outputs, simpleEncodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := outputs[0].(*bytes.Buffer).Bytes()
|
||||
expected := []byte{0x00, 0x01, 0x06, 0x07, 0x0c}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[0] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[1].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x02, 0x03, 0x08, 0x09, 0x0d}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[1] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[2].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x04, 0x05, 0x0a, 0x0b, 0x0e}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[2] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
unexpected := []byte{0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
|
||||
actual = outputs[3].(*bytes.Buffer).Bytes()
|
||||
if len(actual) != len(unexpected) {
|
||||
t.Errorf("Expected[3] actual len to be %d, not %d", len(unexpected), len(actual))
|
||||
}
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[3] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
|
||||
actual = outputs[4].(*bytes.Buffer).Bytes()
|
||||
if len(actual) != len(unexpected) {
|
||||
t.Errorf("Expected[4] actual len to be %d, not %d", len(unexpected), len(actual))
|
||||
}
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[4] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode_OddStride_Short1(t *testing.T) {
|
||||
file := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d}
|
||||
simpleEncodeMeta.Params.Size = uint64(len(file))
|
||||
|
||||
reader := bytes.NewReader(file)
|
||||
|
||||
outputs := []io.Writer{&bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}}
|
||||
|
||||
if err := Encode(reader, outputs, simpleEncodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := outputs[0].(*bytes.Buffer).Bytes()
|
||||
expected := []byte{0x00, 0x01, 0x06, 0x07, 0x0c}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[0] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[1].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x02, 0x03, 0x08, 0x09, 0x0d}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[1] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[2].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x04, 0x05, 0x0a, 0x0b, 0x00}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[2] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
unexpected := []byte{0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
|
||||
actual = outputs[3].(*bytes.Buffer).Bytes()
|
||||
if len(actual) != len(unexpected) {
|
||||
t.Errorf("Expected[3] actual len to be %d, not %d", len(unexpected), len(actual))
|
||||
}
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[3] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
|
||||
actual = outputs[4].(*bytes.Buffer).Bytes()
|
||||
if len(actual) != len(unexpected) {
|
||||
t.Errorf("Expected[4] actual len to be %d, not %d", len(unexpected), len(actual))
|
||||
}
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[4] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode_OddStride_Short2(t *testing.T) {
|
||||
file := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c}
|
||||
simpleEncodeMeta.Params.Size = uint64(len(file))
|
||||
|
||||
reader := bytes.NewReader(file)
|
||||
|
||||
outputs := []io.Writer{&bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}}
|
||||
|
||||
if err := Encode(reader, outputs, simpleEncodeMeta); err != nil {
|
||||
t.Errorf(`Error: %s`, err)
|
||||
}
|
||||
|
||||
actual := outputs[0].(*bytes.Buffer).Bytes()
|
||||
expected := []byte{0x00, 0x01, 0x06, 0x07, 0x0c}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[0] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[1].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x02, 0x03, 0x08, 0x09, 0x00}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[1] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
actual = outputs[2].(*bytes.Buffer).Bytes()
|
||||
expected = []byte{0x04, 0x05, 0x0a, 0x0b, 0x00}
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Errorf(`Expected[2] %x to equal %x`, actual, expected)
|
||||
}
|
||||
|
||||
unexpected := []byte{0x00, 0x00, 0x00, 0x00, 0x00}
|
||||
|
||||
actual = outputs[3].(*bytes.Buffer).Bytes()
|
||||
if len(actual) != len(unexpected) {
|
||||
t.Errorf("Expected[3] actual len to be %d, not %d", len(unexpected), len(actual))
|
||||
}
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[3] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
|
||||
actual = outputs[4].(*bytes.Buffer).Bytes()
|
||||
if len(actual) != len(unexpected) {
|
||||
t.Errorf("Expected[4] actual len to be %d, not %d", len(unexpected), len(actual))
|
||||
}
|
||||
if bytes.Equal(actual, unexpected) {
|
||||
t.Errorf(`Expected[4] %x to not equal %x`, actual, unexpected)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type EEMeta struct {
|
||||
Params
|
||||
Name string `json:"name"`
|
||||
ShardMerkle FileHash `json:"shardMerkle"`
|
||||
ShardHashes HashList `json:"shardHashes"`
|
||||
ParityMerkle FileHash `json:"parityMerkle"`
|
||||
ParityHashes HashList `json:"parityHashes"`
|
||||
}
|
||||
|
||||
type FileHash []byte
|
||||
|
||||
func (hash FileHash) MarshalJSON() ([]byte, error) {
|
||||
text, err := hash.MarshalText()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(string(text))
|
||||
}
|
||||
|
||||
func (hash FileHash) MarshalText() ([]byte, error) {
|
||||
return []byte(base64.StdEncoding.EncodeToString(hash)), nil
|
||||
}
|
||||
|
||||
func (hash *FileHash) UnmarshalJSON(data []byte) error {
|
||||
s := ""
|
||||
json.Unmarshal(data, &s)
|
||||
return hash.UnmarshalText([]byte(s))
|
||||
}
|
||||
|
||||
func (hash *FileHash) UnmarshalText(text []byte) error {
|
||||
raw, err := base64.StdEncoding.DecodeString(string(text))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(raw) != sha256.Size {
|
||||
return fmt.Errorf("expected hash to be %d bytes (got %d)", sha256.Size, len(raw))
|
||||
}
|
||||
*hash = raw
|
||||
return nil
|
||||
}
|
||||
|
||||
type HashList [][]byte
|
||||
|
||||
func (hashes HashList) MarshalJSON() ([]byte, error) {
|
||||
text, err := hashes.MarshalText()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(string(text))
|
||||
}
|
||||
|
||||
func (hashes HashList) MarshalText() ([]byte, error) {
|
||||
// fmt.Printf("marshal hash list...\n")
|
||||
hashesLen := len(hashes)
|
||||
hashLen := 0
|
||||
dataLen := 0
|
||||
if hashesLen != 0 {
|
||||
hashLen = len(hashes[0])
|
||||
dataLen = hashesLen * hashLen
|
||||
}
|
||||
data := make([]byte, dataLen)
|
||||
for i, hash := range hashes {
|
||||
copy(data[i*hashLen:], hash)
|
||||
}
|
||||
return []byte(base64.StdEncoding.EncodeToString(data)), nil
|
||||
}
|
||||
|
||||
func (hashes *HashList) UnmarshalJSON(data []byte) error {
|
||||
s := ""
|
||||
json.Unmarshal(data, &s)
|
||||
return hashes.UnmarshalText([]byte(s))
|
||||
}
|
||||
|
||||
func (hashes *HashList) UnmarshalText(text []byte) error {
|
||||
raw, err := base64.StdEncoding.DecodeString(string(text))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(raw)%sha256.Size != 0 {
|
||||
return fmt.Errorf("expected hash list to be multiple of %d bytes", sha256.Size)
|
||||
}
|
||||
split := make([][]byte, len(raw)/sha256.Size)
|
||||
for i := 0; i < len(raw); i += sha256.Size {
|
||||
split[i/sha256.Size] = raw[i : i+sha256.Size]
|
||||
}
|
||||
*hashes = split
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func merkleSha256(inputHashes [][]byte) []byte {
|
||||
CHILD_NUM := 2
|
||||
|
||||
hashesLen := len(inputHashes)
|
||||
if hashesLen == 0 {
|
||||
panic(fmt.Errorf("input must be at least one hash"))
|
||||
}
|
||||
hashLen := len(inputHashes[0])
|
||||
hashes := make([][]byte, hashesLen)
|
||||
copy(hashes, inputHashes)
|
||||
|
||||
for true {
|
||||
for i := 0; i < hashesLen; i += CHILD_NUM {
|
||||
hashData := make([]byte, hashLen*CHILD_NUM)
|
||||
for j := 0; j < CHILD_NUM && i+j < hashesLen; j++ {
|
||||
copy(hashData[j*hashLen:], hashes[i+j])
|
||||
}
|
||||
hashes[i/CHILD_NUM] = sha256sum(hashData)
|
||||
}
|
||||
newLen := hashesLen / CHILD_NUM
|
||||
if hashesLen%CHILD_NUM > 0 {
|
||||
newLen += 1
|
||||
}
|
||||
hashes = hashes[0:newLen]
|
||||
hashesLen = newLen
|
||||
|
||||
if hashesLen == 1 {
|
||||
return hashes[0]
|
||||
}
|
||||
}
|
||||
panic(fmt.Errorf("unexpected code path"))
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMerkle1(t *testing.T) {
|
||||
i1 := sha256sum([]byte("1input data1"))
|
||||
i2 := make([]byte, sha256.Size)
|
||||
expected := sha256sum(append(i1, i2...))
|
||||
input := [][]byte{i1}
|
||||
actual := merkleSha256(input)
|
||||
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Fatalf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMerkle2(t *testing.T) {
|
||||
i1 := sha256sum([]byte("1input data1"))
|
||||
i2 := sha256sum([]byte("2input data2"))
|
||||
expected := sha256sum(append(i1, i2...))
|
||||
input := [][]byte{i1, i2}
|
||||
actual := merkleSha256(input)
|
||||
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Fatalf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMerkle3(t *testing.T) {
|
||||
i1_1 := sha256sum([]byte("1input data1"))
|
||||
i1_2 := sha256sum([]byte("2input data2"))
|
||||
i1 := sha256sum(append(i1_1, i1_2...))
|
||||
|
||||
i2_1 := sha256sum([]byte("3input data3"))
|
||||
i2_2 := make([]byte, sha256.Size)
|
||||
i2 := sha256sum(append(i2_1, i2_2...))
|
||||
|
||||
expected := sha256sum(append(i1, i2...))
|
||||
input := [][]byte{i1_1, i1_2, i2_1}
|
||||
actual := merkleSha256(input)
|
||||
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Fatalf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMerkle4(t *testing.T) {
|
||||
i1_1 := sha256sum([]byte("1input data1"))
|
||||
i1_2 := sha256sum([]byte("2input data2"))
|
||||
i1 := sha256sum(append(i1_1, i1_2...))
|
||||
|
||||
i2_1 := sha256sum([]byte("3input data3"))
|
||||
i2_2 := sha256sum([]byte("4input data4"))
|
||||
i2 := sha256sum(append(i2_1, i2_2...))
|
||||
|
||||
expected := sha256sum(append(i1, i2...))
|
||||
input := [][]byte{i1_1, i1_2, i2_1, i2_2}
|
||||
actual := merkleSha256(input)
|
||||
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Fatalf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMerkle5(t *testing.T) {
|
||||
i1_1_1 := sha256sum([]byte("1input data1"))
|
||||
i1_1_2 := sha256sum([]byte("2input data2"))
|
||||
i1_2_1 := sha256sum([]byte("3input data3"))
|
||||
i1_2_2 := sha256sum([]byte("4input data4"))
|
||||
i1_1 := sha256sum(append(i1_1_1, i1_1_2...))
|
||||
i1_2 := sha256sum(append(i1_2_1, i1_2_2...))
|
||||
i1 := sha256sum(append(i1_1, i1_2...))
|
||||
|
||||
i2_1 := sha256sum([]byte("5input data5"))
|
||||
i2_2 := make([]byte, sha256.Size)
|
||||
i2 := sha256sum(append(i2_1, i2_2...))
|
||||
|
||||
expected := sha256sum(append(i1, i2...))
|
||||
input := [][]byte{i1_1, i1_2, i2_1, i2_2}
|
||||
actual := merkleSha256(input)
|
||||
|
||||
if !bytes.Equal(actual, expected) {
|
||||
t.Fatalf(`Expected %x to equal %x`, actual, expected)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Params struct {
|
||||
Size uint64 `json:"s,string"`
|
||||
Stride int32 `json:"t"`
|
||||
Shards uint16 `json:"h"`
|
||||
Parity uint16 `json:"p"`
|
||||
}
|
||||
|
||||
type ChunkShardMeta struct {
|
||||
Chunk uint16
|
||||
ChunkOffset uint64
|
||||
GlobalOffset uint64
|
||||
Size int32
|
||||
}
|
||||
|
||||
func (params Params) Plan(offset, size uint64) []ChunkShardMeta {
|
||||
outputs := []ChunkShardMeta{}
|
||||
end := offset + size
|
||||
if (end - 1) > params.Size {
|
||||
panic(fmt.Errorf("attempted read beyond end of file"))
|
||||
}
|
||||
// constant
|
||||
shards := uint64(params.Shards)
|
||||
baseStride := uint64(params.Stride)
|
||||
baseStripeWidth := baseStride * shards
|
||||
|
||||
oddStripeOffset := (params.Size / baseStripeWidth) * baseStripeWidth
|
||||
oddChunkOffset := oddStripeOffset / shards
|
||||
oddSize := (params.Size - oddStripeOffset)
|
||||
oddStride := oddSize / shards
|
||||
if oddSize%shards > 0 {
|
||||
oddStride += 1
|
||||
}
|
||||
|
||||
for offset < end {
|
||||
output := ChunkShardMeta{GlobalOffset: offset}
|
||||
// var chunk uint64 // which chunk the shard is in
|
||||
// var chunkOffset uint64 // the location within the chunk that the shard begins
|
||||
// var shardOffset uint64 // the location within the shard at which the data begins
|
||||
// var readSize uint64 // the number of bytes to read from the shard
|
||||
|
||||
if offset >= oddStripeOffset {
|
||||
localOffset := offset - oddStripeOffset // the location relative to the odd data at which the desired data begins
|
||||
output.Chunk = uint16(localOffset / oddStride)
|
||||
shardOffset := localOffset % oddStride
|
||||
output.ChunkOffset = oddChunkOffset + shardOffset
|
||||
output.Size = int32(min(end-offset, oddStride-shardOffset))
|
||||
} else {
|
||||
shardNum := offset / baseStride // which shard the data is in
|
||||
output.Chunk = uint16(shardNum % shards)
|
||||
shardOffset := offset % baseStride
|
||||
output.ChunkOffset = ((shardNum / shards) * baseStride) + shardOffset
|
||||
output.Size = int32(min(end-offset, baseStride-shardOffset))
|
||||
}
|
||||
if output.Size <= 0 {
|
||||
panic(fmt.Errorf("invalid read size"))
|
||||
}
|
||||
outputs = append(outputs, output)
|
||||
offset += uint64(output.Size)
|
||||
}
|
||||
return outputs
|
||||
}
|
|
@ -0,0 +1,185 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// example uses a tiny shard size for testing
|
||||
// ex: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, a, b, c, d, e, f, g, h] ShardSize(2) RS(3,2)
|
||||
// c0: [01, 67, cd] [s0, s3, s6]
|
||||
// c1: [23, 89, ef] [s1, s4, s7]
|
||||
// c2: [45, ab, gh] [s2, s5, s8]
|
||||
// read(7, 7)
|
||||
// desired plan 0,3,1; 1,2,2; 2,2,2; 0,4,2 -> 7; 89; ab; cd
|
||||
|
||||
func TestPlan_EvenStride_Full(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 18,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// example uses a tiny shard size for testing
|
||||
// ex: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, a, b, c, d, e, f] ShardSize(2) RS(3,2)
|
||||
// c0: [01, 67, cd] [s0, s3, s6]
|
||||
// c1: [23, 89, ef] [s1, s4, s7]
|
||||
// c2: [45, ab, !!] [s2, s5, s8]
|
||||
// read(7, 7)
|
||||
// desired plan 0,3,1; 1,2,2; 2,2,2; 0,4,2 -> 7; 89; ab; cd
|
||||
|
||||
func TestPlan_EvenStride_Short2(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 16,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// example uses a tiny shard size for testing
|
||||
// ex: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, a, b, c, d, e, f, g] ShardSize(2) RS(3,2)
|
||||
// c0: [01, 67, cd] [s0, s3, s6]
|
||||
// c1: [23, 89, ef] [s1, s4, s7]
|
||||
// c2: [45, ab, g!] [s2, s5, s8]
|
||||
// read(7, 7)
|
||||
// desired plan 0,3,1; 1,2,2; 2,2,2; 0,4,2 -> 7; 89; ab; cd
|
||||
|
||||
func TestPlan_EvenStride_Short1(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 17,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 2},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// example uses a tiny shard size for testing
|
||||
// ex: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, a, b, c, d] ShardSize(2) RS(3,2)
|
||||
// c0: [01, 67, c] [s0, s3, s6]
|
||||
// c1: [23, 89, d] [s1, s4, s7]
|
||||
// c2: [45, ab, !] [s2, s5, s8]
|
||||
// read(7, 7)
|
||||
// desired plan 0,3,1; 1,2,2; 2,2,2; 0,4,1; 1,4,1 -> 7; 89; ab; c; d
|
||||
|
||||
func TestPlan_OddStride_Short1(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 13,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 1},
|
||||
ChunkShardMeta{Chunk: 1, ChunkOffset: 4, GlobalOffset: 13, Size: 1},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlan_OddStride_Full(t *testing.T) {
|
||||
params := Params{
|
||||
Size: 14,
|
||||
Stride: 2,
|
||||
Shards: 3,
|
||||
Parity: 2,
|
||||
}
|
||||
|
||||
expected := []ChunkShardMeta{
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 3, GlobalOffset: 7, Size: 1},
|
||||
ChunkShardMeta{Chunk: 1, ChunkOffset: 2, GlobalOffset: 8, Size: 2},
|
||||
ChunkShardMeta{Chunk: 2, ChunkOffset: 2, GlobalOffset: 10, Size: 2},
|
||||
ChunkShardMeta{Chunk: 0, ChunkOffset: 4, GlobalOffset: 12, Size: 1},
|
||||
ChunkShardMeta{Chunk: 1, ChunkOffset: 4, GlobalOffset: 13, Size: 1},
|
||||
}
|
||||
actual := params.Plan(7, 7)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf(`Expected output to have length of %x, got %x`, len(expected), len(actual))
|
||||
}
|
||||
|
||||
for i, actualItem := range actual {
|
||||
if fmt.Sprintf("%#v", actualItem) != fmt.Sprintf("%#v", expected[i]) {
|
||||
t.Errorf(`Expected %#v to equal %#v at %d`, actualItem, expected[i], i)
|
||||
} else {
|
||||
// fmt.Printf("ok: %#v\n", actualItem)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
)
|
||||
|
||||
func min(input ...uint64) uint64 {
|
||||
min := input[0]
|
||||
for i := 1; i < len(input); i++ {
|
||||
if input[i] < min {
|
||||
min = input[i]
|
||||
}
|
||||
}
|
||||
return min
|
||||
}
|
||||
|
||||
func sha256sum(input []byte) []byte {
|
||||
v := sha256.Sum256(input)
|
||||
return v[:]
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package erasureencode
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// example uses a tiny shard size for testing
|
||||
// ex: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, a, b, c, d, e, f, g] ShardSize(2) RS(3,2)
|
||||
// c0: [01, 67, cd] [s0, s3, s6]
|
||||
// c1: [23, 89, ef] [s1, s4, s7]
|
||||
// c2: [45, ab, g0] [s2, s5, s8]
|
||||
// read(7, 7)
|
||||
// desired plan 0,3,1; 1,2,2; 2,2,2; 0,4,2 -> 7; 89; ab; cd
|
||||
|
||||
func TestMin(t *testing.T) {
|
||||
expected := uint64(61)
|
||||
|
||||
actual := min(91, 111111, 9102, 61)
|
||||
if actual != expected {
|
||||
t.Fatalf(`Expected output to be %d, got %d`, actual, expected)
|
||||
}
|
||||
|
||||
actual = min(61, 39421)
|
||||
if actual != expected {
|
||||
t.Fatalf(`Expected output to be %d, got %d`, actual, expected)
|
||||
}
|
||||
|
||||
actual = min(61)
|
||||
if actual != expected {
|
||||
t.Fatalf(`Expected output to be %d, got %d`, actual, expected)
|
||||
}
|
||||
|
||||
actual = min(1923042, 61, 122)
|
||||
if actual != expected {
|
||||
t.Fatalf(`Expected output to be %d, got %d`, actual, expected)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
package storefile
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
eenc "git.keganmyers.com/terribleplan/file-store/pkg/erasureencode"
|
||||
)
|
||||
|
||||
func StoreFile(inputPath, outputPath string, size int32, shards, parity uint16, name string) error {
|
||||
if size <= 0 {
|
||||
return fmt.Errorf("size must be greater than 0")
|
||||
}
|
||||
|
||||
// fmt.Printf("starting...\n")
|
||||
inputPath, err := filepath.Abs(inputPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
inputFile, err := os.Open(inputPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer inputFile.Close()
|
||||
|
||||
if err := os.MkdirAll(outputPath, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if dirents, err := os.ReadDir(outputPath); err != nil {
|
||||
return err
|
||||
} else if len(dirents) > 0 {
|
||||
return fmt.Errorf("expected output dir to be empty")
|
||||
}
|
||||
|
||||
outputFileCount := shards + parity
|
||||
outputFiles := make([]*os.File, outputFileCount)
|
||||
for i := uint16(0); i < outputFileCount; i++ {
|
||||
outputFile, err := os.OpenFile(filepath.Join(outputPath, fmt.Sprintf("shard.%04d", i+1)), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer outputFile.Close()
|
||||
outputFiles[i] = outputFile
|
||||
}
|
||||
|
||||
outputs := make([]io.Writer, outputFileCount)
|
||||
for i, f := range outputFiles {
|
||||
outputs[i] = f
|
||||
}
|
||||
|
||||
// fmt.Printf("writing shards...\n")
|
||||
meta, err := eenc.EncodeFile(inputFile, outputs, size, shards, parity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
meta.Name = name
|
||||
if meta.Name == "" {
|
||||
meta.Name = filepath.Base(inputPath)
|
||||
}
|
||||
|
||||
// fmt.Printf("writing meta...\n")
|
||||
metaFile, err := os.OpenFile(filepath.Join(outputPath, "meta.json"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer metaFile.Close()
|
||||
|
||||
enc := json.NewEncoder(metaFile)
|
||||
if err := enc.Encode(meta); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := metaFile.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// fmt.Printf("done...\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadFile(inputPath, outputPath string, overwrite bool) error {
|
||||
inputAbs, err := filepath.Abs(inputPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
metaFile, err := os.OpenFile(filepath.Join(inputAbs, "meta.json"), os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
defer metaFile.Close()
|
||||
|
||||
meta := &eenc.EEMeta{}
|
||||
dec := json.NewDecoder(metaFile)
|
||||
if err := dec.Decode(meta); err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// fmt.Printf("%#v", *meta)
|
||||
|
||||
if outputPath == "" {
|
||||
outputPath = meta.Name
|
||||
}
|
||||
|
||||
outputFlags := os.O_WRONLY
|
||||
if overwrite {
|
||||
outputFlags |= os.O_CREATE | os.O_TRUNC
|
||||
} else {
|
||||
outputFlags |= os.O_CREATE | os.O_EXCL
|
||||
}
|
||||
outputAbs, err := filepath.Abs(outputPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
outputFile, err := os.OpenFile(outputAbs, outputFlags, 0644)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
inputFiles := make([]*os.File, meta.Params.Shards)
|
||||
for i := uint16(0); i < meta.Params.Shards; i++ {
|
||||
inputFile, err := os.OpenFile(filepath.Join(inputPath, fmt.Sprintf("shard.%04d", i+1)), os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
defer inputFile.Close()
|
||||
inputFiles[i] = inputFile
|
||||
}
|
||||
|
||||
inputs := make([]io.ReadSeeker, meta.Params.Shards)
|
||||
for i, f := range inputFiles {
|
||||
inputs[i] = f
|
||||
}
|
||||
|
||||
if err := eenc.Decode(inputs, outputFile, meta); err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package storefile
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
)
|
||||
|
||||
func sha256sum(input []byte) []byte {
|
||||
v := sha256.Sum256(input)
|
||||
return v[:]
|
||||
}
|
Loading…
Reference in New Issue