package tdb import ( "errors" "fmt" "git.keganmyers.com/terribleplan/tdb/stringy" "github.com/golang/protobuf/proto" bolt "go.etcd.io/bbolt" ) type Table interface { debugLogger Iterable Transactable // New Create(proto.Message, ...*Tx) (uint64, error) CreateOrPanic(proto.Message, ...*Tx) uint64 // Read Get(uint64, ...*Tx) (proto.Message, error) Query() Query Where(fieldName, op string, value interface{}) Query // Modify Put(value proto.Message, txs ...*Tx) error Update(id uint64, action func(proto.Message) error, txs ...*Tx) error } type TableSetup interface { debugLogger AddIndex(options SimpleIndexOptions) error AddIndexOrPanic(options SimpleIndexOptions) AddArrayIndex(options ArrayIndexOptions) error AddArrayIndexOrPanic(options ArrayIndexOptions) } type IndexQueryOpts struct { Desc bool } type CreateTableSchema func(createSchema TableSetup) error type table struct { db *db name string nameBytes []byte t *dbType tPtr *dbPtrType idField dbField indicies map[string]indexish constraints map[string]constraintish } func newTable(db *db, t *dbType, idField dbField, createSchema CreateTableSchema) (*table, error) { db.debugLogf("Creating table for %s", t.Name) ktbl := &table{ db: db, name: t.Name, nameBytes: []byte(t.Name), t: t, tPtr: t.PtrType(), idField: idField, indicies: make(map[string]indexish), constraints: make(map[string]constraintish), } err := createSchema(ktbl) if err != nil { return nil, err } return ktbl, nil } func (t *table) debugLog(message string) { t.db.debugLog(message) } func (t *table) debugLogf(f string, args ...interface{}) { t.db.debugLogf(f, args...) } func (t *table) bucket(tx *Tx) *bolt.Bucket { return tx.tx().Bucket(t.nameBytes) } func (t *table) AddIndex(options SimpleIndexOptions) error { if options.Table != "" && options.Table != t.name { t.debugLogf("warn: ignoring table name in index creation options, leave blank to disable this warning (got '%s')", options.Table) } if _, exists := t.indicies[options.Field]; exists { return fmt.Errorf("There is already an index on '%s'.'%s'", t.name, options.Field) } if _, exists := t.constraints[options.Field]; exists { return fmt.Errorf("There are already constraints on '%s'.'%s'", t.name, options.Field) } options.Table = t.name index, err := newSimpleIndex(t, options) if err != nil { return err } t.indicies[options.Field] = index t.constraints[options.Field] = index return nil } func (t *table) AddIndexOrPanic(options SimpleIndexOptions) { if err := t.AddIndex(options); err != nil { panic(err) } } func (t *table) AddArrayIndex(options ArrayIndexOptions) error { if options.Table != "" && options.Table != t.name { t.debugLogf("warn: ignoring table name in index creation options, leave blank to disable this warning (got '%s')", options.Table) } if _, exists := t.indicies[options.Field]; exists { return fmt.Errorf("There is already an index on '%s'.'%s'", t.name, options.Field) } if _, exists := t.constraints[options.Field]; exists { return fmt.Errorf("There are already constraints on '%s'.'%s'", t.name, options.Field) } options.Table = t.name index, err := newArrayIndex(t, options) if err != nil { return err } t.indicies[options.Field] = index t.constraints[options.Field] = index return nil } func (t *table) AddArrayIndexOrPanic(options ArrayIndexOptions) { if err := t.AddArrayIndex(options); err != nil { panic(err) } } // Create will insert a record with the next available ID in sequence func (t *table) Create(thing proto.Message, txs ...*Tx) (uint64, error) { t.debugLogf("[table.Create] Creating '%s'", t.name) var id uint64 pv := dbPtrValueOf(thing) if !pv.IsOfPtrType(t.tPtr) { return 0, fmt.Errorf("[table.Create] Expected type '%s' in call (got '%s')", t.tPtr.String(), pv.PtrType().String()) } if err := t.writeTxHelper(func(tx *Tx) error { var idString []byte b := t.bucket(tx) for { id, _ = b.NextSequence() idString = []byte(stringy.LiteralUintToString(id)) if b.Get(idString) == nil { break } } t.debugLogf("[table.Create] New '%s' will have Id '%d'", t.name, id) pv.dangerous_Field(t.idField).SetUint(id) if err := t.validate(pv, txs...); err != nil { return err } data, err := pv.Marshal() if err != nil { return err } b.Put(idString, data) t.updateIndicies(tx, t.tPtr.Zero(), pv) t.debugLogf("[table.Create] Created '%s' with Id '%d'", t.name, id) return nil }, txs...); err != nil { return 0, err } return id, nil } func (t *table) Put(thing proto.Message, txs ...*Tx) error { t.debugLogf("[table.Put] Putting '%s'", t.name) pv := dbPtrValueOf(thing) if !pv.IsOfPtrType(t.tPtr) { return fmt.Errorf("[table.Put] Expected type '%s' in call (got '%s')", t.tPtr.String(), pv.PtrType().String()) } id := pv.dangerous_Field(t.idField).Uint() idString := []byte(stringy.LiteralUintToString(id)) data, err := pv.Marshal() if err != nil { return err } if err := t.writeTxHelper(func(tx *Tx) error { b := t.bucket(tx) old, err := t.getValWithinTx(b, idString) if err != nil { return err } if err := t.validate(pv, tx); err != nil { return err } if err := b.Put(idString, data); err != nil { return err } t.updateIndicies(tx, old, pv) return nil }, txs...); err != nil { return err } return nil } func (t *table) CreateOrPanic(thing proto.Message, txs ...*Tx) uint64 { id, err := t.Create(thing, txs...) if err != nil { panic(err) } return id } func (t *table) Get(id uint64, txs ...*Tx) (proto.Message, error) { //todo: replace with a query? (once the query engine can optimize .Id = x) return t.getRaw([]byte(stringy.ToStringOrPanic(id)), txs...) } func (t *table) getRaw(id []byte, txs ...*Tx) (vProtoMessage proto.Message, err error) { return vProtoMessage, t.readTxHelper(func(tx *Tx) error { vProtoMessage, err = t.getWithinTx(t.bucket(tx), id) if err != nil { return err } return nil }, txs...) } func (t *table) getValWithinTx(b *bolt.Bucket, id []byte) (dbPtrValue, error) { t.debugLogf("[table.getValWithinTx] looking up '%s'", id) //todo: replace with a query? (once the query engine can optimize .Id = x) v := b.Get([]byte(id)) if v == nil { t.debugLogf("got nil for '%s'", id) return t.tPtr.Zero(), nil } return t.tPtr.Unmarshal(v) } func (t *table) getWithinTx(b *bolt.Bucket, id []byte) (proto.Message, error) { pv, err := t.getValWithinTx(b, id) if err != nil { return nil, err } return pv.Proto(), nil } func (t *table) Query() Query { return &queryData{ table: t, ops: make([]queryOpish, 0), } } func (t *table) Where(fieldName, op string, value interface{}) Query { return t.Query().Where(fieldName, op, value) } func (t *table) Update(id uint64, action func(proto.Message) error, txs ...*Tx) error { idBytes := []byte(stringy.LiteralUintToString(id)) return t.writeTxHelper(func(tx *Tx) error { b := t.bucket(tx) v := b.Get(idBytes) if v == nil { t.debugLogf("got nil for '%s'", idBytes) return fmt.Errorf("No such entry '%d' in table '%s'", id, t.name) } original, err := t.tPtr.Unmarshal(v) if err != nil { return err } updated, err := t.tPtr.Unmarshal(v) if err != nil { return err } if err := action(updated.Proto()); err != nil { return err } data, err := updated.Marshal() if err != nil { return err } err = b.Put(idBytes, data) if err != nil { return err } t.updateIndicies(tx, original, updated) return nil }, txs...) } func (t *table) iterateRaw(i rawIterator, txs ...*Tx) error { return t.readTxHelper(func(tx *Tx) error { c := t.bucket(tx).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { t.debugLogf("iterating over '%s' '%s'", t.name, k) pv, err := t.tPtr.Unmarshal(v) if err != nil { t.debugLogf("[table.iterateRaw] error while iterating over '%s' '%s'", t.name, k) } signal, err := i(pv) if err != nil { return err } if signal == StopIteration { break } } return nil }, txs...) } func (t *table) Iterate(i Iterator, txs ...*Tx) error { return t.iterateRaw(func(pv dbPtrValue) (IterationSignal, error) { return i(pv.Proto()) }, txs...) } func (t *table) IterateKeys(i KeyIterator, txs ...*Tx) error { panic(errors.New("unimplemented")) } func (t *table) initialize(tx *Tx) error { _, err := tx.tx().CreateBucketIfNotExists(t.nameBytes) for _, index := range t.indicies { if err := index.initialize(tx); err != nil { return err } } return err } func (t *table) validate(pv dbPtrValue, txs ...*Tx) error { if pv.IsNil() { return nil } return t.readTxHelper(func(tx *Tx) error { for _, c := range t.constraints { if err := c.validate(tx, pv); err != nil { return err } } return nil }, txs...) } func (t *table) putIndicies(tx *Tx, after dbPtrValue) { for _, index := range t.indicies { index.put(tx, after) } } func (t *table) deleteIndicies(tx *Tx, before dbPtrValue) { for _, index := range t.indicies { index.delete(tx, before) } } func (t *table) updateIndiciesRaw(tx *Tx, before, after dbPtrValue) { for _, index := range t.indicies { index.update(tx, before, after) } } func (t *table) updateIndicies(tx *Tx, before, after dbPtrValue) { if before.IsNil() { t.putIndicies(tx, after) return } if after.IsNil() { t.deleteIndicies(tx, before) return } t.updateIndiciesRaw(tx, before, after) return } func (t *table) ReadTx(ta Transaction) error { return t.db.ReadTx(ta) } func (t *table) readTxHelper(ta Transaction, txs ...*Tx) error { return t.db.readTxHelper(ta, txs...) } func (t *table) WriteTx(ta Transaction) error { return t.db.WriteTx(ta) } func (t *table) writeTxHelper(ta Transaction, txs ...*Tx) error { return t.db.writeTxHelper(ta, txs...) }