diff --git a/stream/blob.go b/stream/blob.go index e422bab..fb6c91c 100644 --- a/stream/blob.go +++ b/stream/blob.go @@ -35,7 +35,7 @@ func (b Blob) Hash() []byte { return hashBytes[:] } -// HashHex returns th blob hash as a hex string +// HashHex returns the blob hash as a hex string func (b Blob) HashHex() string { return hex.EncodeToString(b.Hash()) } diff --git a/stream/sdBlob.go b/stream/sdBlob.go index 557ba6d..9186762 100644 --- a/stream/sdBlob.go +++ b/stream/sdBlob.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "encoding/json" "strconv" + "strings" ) const streamTypeLBRYFile = "lbryfile" @@ -44,10 +45,30 @@ type SDBlob struct { StreamHash []byte `json:"-"` } +// Hash returns a hash of the SD blob data +func (s SDBlob) Hash() []byte { + hashBytes := sha512.Sum384(s.ToBlob()) + return hashBytes[:] +} + +// HashHex returns the SD blob hash as a hex string +func (s SDBlob) HashHex() string { + return hex.EncodeToString(s.Hash()) +} + // ToBlob converts the SDBlob to a normal data Blob -func (s SDBlob) ToBlob() (Blob, error) { - b, err := json.Marshal(s) - return Blob(b), err +func (s SDBlob) ToBlob() Blob { + jsonSD, err := json.Marshal(s) + if err != nil { + panic(err) + } + + // COMPATIBILITY HACK to make json output match python's json. this can be + // removed when we implement canonical JSON encoding + jsonSD = []byte(strings.Replace(string(jsonSD), ",", ", ", -1)) + jsonSD = []byte(strings.Replace(string(jsonSD), ":", ": ", -1)) + + return jsonSD } // FromBlob unmarshals a data Blob that should contain SDBlob data @@ -55,30 +76,6 @@ func (s *SDBlob) FromBlob(b Blob) error { return json.Unmarshal(b, s) } -func newSdBlob(blobs []Blob, key []byte, ivs [][]byte, streamName, suggestedFilename string) *SDBlob { - if len(ivs) != len(blobs)+1 { // +1 for terminating 0-length blob - panic("wrong number of IVs provided") - } - - sd := &SDBlob{ - StreamType: streamTypeLBRYFile, - StreamName: streamName, - SuggestedFileName: suggestedFilename, - Key: key, - } - - for i, b := range blobs { - sd.addBlob(b, ivs[i]) - } - - // terminating blob - sd.addBlob(Blob{}, ivs[len(ivs)-1]) - - sd.updateStreamHash() - - return sd -} - // addBlob adds the blob's info to stream func (s *SDBlob) addBlob(b Blob, iv []byte) { if len(iv) == 0 { diff --git a/stream/stream.go b/stream/stream.go index c9aac39..fe31f6b 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -2,8 +2,10 @@ package stream import ( "bytes" + "crypto/sha512" + "hash" + "io" "math" - "strings" "github.com/lbryio/lbry.go/v2/extras/errors" ) @@ -13,66 +15,22 @@ type Stream []Blob // -1 to leave room for padding, since there must be at least one byte of pkcs7 padding const maxBlobDataSize = MaxBlobSize - 1 -// New creates a new Stream from a byte slice -func New(data []byte) (Stream, error) { - key := randIV() - ivs := make([][]byte, numContentBlobs(data)+1) // +1 for terminating 0-length blob - for i := range ivs { - ivs[i] = randIV() - } - - return makeStream(data, key, ivs, "", "") -} - -// Reconstruct creates a stream from the given data using predetermined IVs and key from the SD blob -// NOTE: this will assume that all blobs except the last one are at max length. in theory this is not -// required, but in practice this is always true. if this is false, streams may not match exactly -func Reconstruct(data []byte, sdBlob SDBlob) (Stream, error) { - ivs := make([][]byte, len(sdBlob.BlobInfos)) - for i := range ivs { - ivs[i] = sdBlob.BlobInfos[i].IV - } - - return makeStream(data, sdBlob.Key, ivs, sdBlob.StreamName, sdBlob.SuggestedFileName) -} - -func makeStream(data, key []byte, ivs [][]byte, streamName, suggestedFilename string) (Stream, error) { - var err error - - numBlobs := numContentBlobs(data) - if len(ivs) != numBlobs+1 { // +1 for terminating 0-length blob - return nil, errors.Err("incorrect number of IVs provided") - } - - s := make(Stream, numBlobs+1) // +1 for sd blob - for i := 0; i < numBlobs; i++ { - start := i * maxBlobDataSize - end := start + maxBlobDataSize - if end > len(data) { - end = len(data) - } - s[i+1], err = NewBlob(data[start:end], key, ivs[i]) - if err != nil { - return nil, err - } - } - - sd := newSdBlob(s[1:], key, ivs, streamName, suggestedFilename) - jsonSD, err := sd.ToBlob() - if err != nil { - return nil, err - } - - // COMPATIBILITY HACK to make json output match python's json. this can be - // removed when we implement canonical JSON encoding - jsonSD = []byte(strings.Replace(string(jsonSD), ",", ", ", -1)) - jsonSD = []byte(strings.Replace(string(jsonSD), ":", ": ", -1)) - - s[0] = jsonSD - return s, nil +// New creates a new Stream from a stream of bytes. +func New(src io.Reader) (Stream, error) { + return NewEncoder(src).Stream() } +// Data returns the file data that a stream encapsulates. +// +// Deprecated: use Decode() instead. It's a more accurate name. Data() will be removed in the future. func (s Stream) Data() ([]byte, error) { + return s.Decode() +} + +// Decode returns the file data that a stream encapsulates +// +// TODO: this should use io.Writer instead of returning bytes +func (s Stream) Decode() ([]byte, error) { if len(s) < 2 { return nil, errors.Err("stream must be at least 2 blobs long") // sd blob and content blob } @@ -124,7 +82,152 @@ func (s Stream) Data() ([]byte, error) { return file, nil } -//numContentBlobs returns the number of content blobs required to store the data -func numContentBlobs(data []byte) int { - return int(math.Ceil(float64(len(data)) / float64(maxBlobDataSize))) +// Encoder reads bytes from a source and returns blobs of the stream +type Encoder struct { + // source data to be encoded into a stream + src io.Reader + // preset IVs to use for encrypting blobs + ivs [][]byte + // an optionals hint about the total size of the source data + // encoder will use this to preallocate space for blobs + srcSizeHint int + + // buffer for reading bytes from reader + buf []byte + // sd blob that gets built as stream is encoded + sd *SDBlob + // number of bytes read from src + srcLen int + // running hash bytes read from src + srcHash hash.Hash +} + +// NewEncoder creates a new stream encoder +func NewEncoder(src io.Reader) *Encoder { + return &Encoder{ + src: src, + + buf: make([]byte, maxBlobDataSize), + sd: &SDBlob{ + StreamType: streamTypeLBRYFile, + Key: randIV(), + }, + srcHash: sha512.New384(), + } +} + +// NewEncoderWithIVs creates a new encoder that uses preset cryptographic material +func NewEncoderWithIVs(src io.Reader, key []byte, ivs [][]byte) *Encoder { + e := NewEncoder(src) + e.sd.Key = key + e.ivs = ivs + return e +} + +// NewEncoderFromSD creates a new encoder that reuses cryptographic material from an sd blob +// This can be used to reconstruct a stream exactly from a file +// NOTE: this will assume that all blobs except the last one are at max length. in theory this is not +// required, but in practice this is always true. if this is false, streams may not match exactly +func NewEncoderFromSD(src io.Reader, sdBlob *SDBlob) *Encoder { + ivs := make([][]byte, len(sdBlob.BlobInfos)) + for i := range ivs { + ivs[i] = sdBlob.BlobInfos[i].IV + } + + e := NewEncoderWithIVs(src, sdBlob.Key, ivs) + e.sd.StreamName = sdBlob.StreamName + e.sd.SuggestedFileName = sdBlob.SuggestedFileName + return e +} + +// TODO: consider making a NewPartialEncoder that also copies blobinfos from sdBlobs and seeks forward in the data +// this would avoid re-creating blobs that were created in the past + +// Next returns the next blob in the stream +func (e *Encoder) Next() (Blob, error) { + n, err := e.src.Read(e.buf) + if err != nil { + return nil, err + } + + e.srcLen += n + e.srcHash.Write(e.buf[:n]) + iv := e.nextIV() + + blob, err := NewBlob(e.buf[:n], e.sd.Key, iv) + if err != nil { + return nil, err + } + + e.sd.addBlob(blob, iv) + + return blob, nil +} + +// Stream creates the whole stream in one call +func (e *Encoder) Stream() (Stream, error) { + s := make(Stream, 1, 1+int(math.Ceil(float64(e.srcSizeHint)/maxBlobDataSize))) // len starts at 1 and cap is +1 to leave room for sd blob + + for { + blob, err := e.Next() + if err != nil { + if !errors.Is(err, io.EOF) { + return nil, err + } + + // if stream is not terminated, terminate it + if e.sd.BlobInfos[len(e.sd.BlobInfos)-1].Length > 0 { + e.sd.addBlob(Blob{}, e.nextIV()) + } + + break + } + + s = append(s, blob) + } + + s[0] = e.SDBlob().ToBlob() + + if cap(s) > len(s) { + // size hint was too big. copy stream to smaller underlying array to free memory + // this might be premature optimization... + s = append(Stream(nil), s[:]...) + } + + return s, nil +} + +// SDBlob returns the sd blob so far +func (e *Encoder) SDBlob() *SDBlob { + e.sd.updateStreamHash() + return e.sd +} + +// SourceLen returns the number of bytes read from source +func (e *Encoder) SourceLen() int { + return e.srcLen +} + +// SourceLen returns a hash of the bytes read from source +func (e *Encoder) SourceHash() []byte { + return e.srcHash.Sum(nil) +} + +// SourceSizeHint sets a hint about the total size of the source +// This helps allocate RAM more efficiently. +// If the hint is wrong, it still works fine but there will be a small performance penalty. +func (e *Encoder) SourceSizeHint(size int) *Encoder { + e.srcSizeHint = size + return e +} + +// nextIV returns the next preset IV if there is one +func (e *Encoder) nextIV() []byte { + if len(e.ivs) == 0 { + return randIV() + } + + iv := e.ivs[0] + e.ivs = e.ivs[1:] + return iv } diff --git a/stream/stream_test.go b/stream/stream_test.go index 1daf105..8375787 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -2,26 +2,28 @@ package stream import ( "bytes" + "crypto/rand" "crypto/sha256" + "crypto/sha512" "encoding/hex" "testing" ) -func TestStreamToFile(t *testing.T) { - blobHashes := []string{ - "1bf7d39c45d1a38ffa74bff179bf7f67d400ff57fa0b5a0308963f08d01712b3079530a8c188e8c89d9b390c6ee06f05", // sd hash - "a2f1841bb9c5f3b583ac3b8c07ee1a5bf9cc48923721c30d5ca6318615776c284e8936d72fa4db7fdda2e4e9598b1e6c", - "0c9675ad7f40f29dcd41883ed9cf7e145bbb13976d9b83ab9354f4f61a87f0f7771a56724c2aa7a5ab43c68d7942e5cb", - "a4d07d442b9907036c75b6c92db316a8b8428733bf5ec976627a48a7c862bf84db33075d54125a7c0b297bd2dc445f1c", - "dcd2093f4a3eca9f6dd59d785d0bef068fee788481986aa894cf72ed4d992c0ff9d19d1743525de2f5c3c62f5ede1c58", - } +var testdataBlobHashes = []string{ + "1bf7d39c45d1a38ffa74bff179bf7f67d400ff57fa0b5a0308963f08d01712b3079530a8c188e8c89d9b390c6ee06f05", // sd hash + "a2f1841bb9c5f3b583ac3b8c07ee1a5bf9cc48923721c30d5ca6318615776c284e8936d72fa4db7fdda2e4e9598b1e6c", + "0c9675ad7f40f29dcd41883ed9cf7e145bbb13976d9b83ab9354f4f61a87f0f7771a56724c2aa7a5ab43c68d7942e5cb", + "a4d07d442b9907036c75b6c92db316a8b8428733bf5ec976627a48a7c862bf84db33075d54125a7c0b297bd2dc445f1c", + "dcd2093f4a3eca9f6dd59d785d0bef068fee788481986aa894cf72ed4d992c0ff9d19d1743525de2f5c3c62f5ede1c58", +} - stream := make(Stream, len(blobHashes)) - for i, hash := range blobHashes { +func TestStreamToFile(t *testing.T) { + stream := make(Stream, len(testdataBlobHashes)) + for i, hash := range testdataBlobHashes { stream[i] = testdata(t, hash) } - data, err := stream.Data() + data, err := stream.Decode() if err != nil { t.Fatal(err) } @@ -33,6 +35,8 @@ func TestStreamToFile(t *testing.T) { t.Errorf("file length mismatch. got %d, expected %d", actualLen, expectedLen) } + expectedFileHash := sha512.Sum384(data) + expectedSha256 := unhex(t, "51e4d03bd6d69ea17d1be3ce01fdffa44ffe053f2dbce8d42a50283b2890fea2") actualSha256 := sha256.Sum256(data) @@ -46,22 +50,104 @@ func TestStreamToFile(t *testing.T) { t.Fatal(err) } - newStream, err := Reconstruct(data, *sdBlob) - if err != nil { - t.Fatal(err) + enc := NewEncoderFromSD(bytes.NewBuffer(data), sdBlob) + newStream, err := enc.Stream() + + if len(newStream) != len(testdataBlobHashes) { + t.Fatalf("stream length mismatch. got %d blobs, expected %d", len(newStream), len(testdataBlobHashes)) } - if len(newStream) != len(blobHashes) { - t.Fatalf("stream length mismatch. got %d blobs, expected %d", len(newStream), len(blobHashes)) + if enc.SourceLen() != expectedLen { + t.Errorf("reconstructed file length mismatch. got %d, expected %d", enc.SourceLen(), expectedLen) } - for i, hash := range blobHashes { + if !bytes.Equal(enc.SourceHash(), expectedFileHash[:]) { + t.Errorf("reconstructed file hash mismatch. got %s, expected %s", hex.EncodeToString(enc.SourceHash()), hex.EncodeToString(expectedFileHash[:])) + } + + for i, hash := range testdataBlobHashes { if newStream[i].HashHex() != hash { t.Errorf("blob %d hash mismatch. got %s, expected %s", i, newStream[i].HashHex(), hash) } } } +func TestMakeStream(t *testing.T) { + blobsToRead := 3 + totalBlobs := blobsToRead + 3 + + data := make([]byte, ((totalBlobs-1)*maxBlobDataSize)+1000) // last blob is partial + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + + buf := bytes.NewBuffer(data) + + enc := NewEncoder(buf) + + stream := make(Stream, blobsToRead+1) // +1 for sd blob + for i := 1; i < blobsToRead+1; i++ { // start at 1 to skip sd blob + stream[i], err = enc.Next() + if err != nil { + t.Fatal(err) + } + } + + sdBlob := enc.SDBlob() + + if len(sdBlob.BlobInfos) != blobsToRead { + t.Errorf("expected %d blobs in partial sdblob, got %d", blobsToRead, len(sdBlob.BlobInfos)) + } + if enc.SourceLen() != maxBlobDataSize*blobsToRead { + t.Errorf("expected length of %d , got %d", maxBlobDataSize*blobsToRead, enc.SourceLen()) + } + + // now finish the stream, reusing key and IVs + + buf = bytes.NewBuffer(data) // rewind to the beginning of the data + + enc = NewEncoderFromSD(buf, sdBlob) + + reconstructedStream, err := enc.Stream() + if err != nil { + t.Fatal(err) + } + + if len(reconstructedStream) != totalBlobs+1 { // +1 for the terminating blob at the end + t.Errorf("expected %d blobs in stream, got %d", totalBlobs+1, len(reconstructedStream)) + } + if enc.SourceLen() != len(data) { + t.Errorf("expected length of %d , got %d", len(data), enc.SourceLen()) + } + + reconstructedSDBlob := enc.SDBlob() + + for i := 0; i < len(sdBlob.BlobInfos); i++ { + if !bytes.Equal(sdBlob.BlobInfos[i].IV, reconstructedSDBlob.BlobInfos[i].IV) { + t.Errorf("blob info %d of reconstructed sd blobd does not match original sd blob", i) + } + } + for i := 1; i < len(stream); i++ { // start at 1 to skip sd blob + if !bytes.Equal(stream[i], reconstructedStream[i]) { + t.Errorf("blob %d of reconstructed stream does not match original stream", i) + } + } +} + +func TestSizeHint(t *testing.T) { + b := make([]byte, 12) + + newStream, err := NewEncoder(bytes.NewBuffer(b)).SourceSizeHint(5 * maxBlobDataSize).Stream() + if err != nil { + t.Fatal(err) + } + + if cap(newStream) != 2 { // 1 for sd blob, 1 for the 12 bytes of the actual stream + t.Fatalf("expected 2 blobs allocated, got %d", cap(newStream)) + } +} + func TestNew(t *testing.T) { t.Skip("TODO: test new stream creation and decryption") }