tdb/table.go

427 lines
9.7 KiB
Go

package tdb
import (
"errors"
"fmt"
"git.keganmyers.com/terribleplan/tdb/stringy"
"github.com/golang/protobuf/proto"
bolt "go.etcd.io/bbolt"
)
type Table interface {
debugLogger
Iterable
Transactable
// New
Create(proto.Message, ...*Tx) (uint64, error)
CreateOrPanic(proto.Message, ...*Tx) uint64
// Read
Get(uint64, ...*Tx) (proto.Message, error)
Query() Query
Where(fieldName, op string, value interface{}) Query
// Modify
Put(value proto.Message, txs ...*Tx) error
Update(id uint64, action func(proto.Message) error, txs ...*Tx) error
}
type TableSetup interface {
debugLogger
AddIndex(options SimpleIndexOptions) error
AddIndexOrPanic(options SimpleIndexOptions)
AddArrayIndex(options ArrayIndexOptions) error
AddArrayIndexOrPanic(options ArrayIndexOptions)
}
type IndexQueryOpts struct {
Desc bool
}
type CreateTableSchema func(createSchema TableSetup) error
type table struct {
db *db
name string
nameBytes []byte
t *dbType
tPtr *dbPtrType
idField dbField
indicies map[string]indexish
constraints map[string]constraintish
}
func newTable(db *db, t *dbType, idField dbField, createSchema CreateTableSchema) (*table, error) {
db.debugLogf("Creating table for %s", t.Name)
ktbl := &table{
db: db,
name: t.Name,
nameBytes: []byte(t.Name),
t: t,
tPtr: t.PtrType(),
idField: idField,
indicies: make(map[string]indexish),
constraints: make(map[string]constraintish),
}
err := createSchema(ktbl)
if err != nil {
return nil, err
}
return ktbl, nil
}
func (t *table) debugLog(message string) {
t.db.debugLog(message)
}
func (t *table) debugLogf(f string, args ...interface{}) {
t.db.debugLogf(f, args...)
}
func (t *table) bucket(tx *Tx) *bolt.Bucket {
return tx.tx().Bucket(t.nameBytes)
}
func (t *table) AddIndex(options SimpleIndexOptions) error {
if options.Table != "" && options.Table != t.name {
t.debugLogf("warn: ignoring table name in index creation options, leave blank to disable this warning (got '%s')", options.Table)
}
if _, exists := t.indicies[options.Field]; exists {
return fmt.Errorf("There is already an index on '%s'.'%s'", t.name, options.Field)
}
if _, exists := t.constraints[options.Field]; exists {
return fmt.Errorf("There are already constraints on '%s'.'%s'", t.name, options.Field)
}
options.Table = t.name
index, err := newSimpleIndex(t, options)
if err != nil {
return err
}
t.indicies[options.Field] = index
t.constraints[options.Field] = index
return nil
}
func (t *table) AddIndexOrPanic(options SimpleIndexOptions) {
if err := t.AddIndex(options); err != nil {
panic(err)
}
}
func (t *table) AddArrayIndex(options ArrayIndexOptions) error {
if options.Table != "" && options.Table != t.name {
t.debugLogf("warn: ignoring table name in index creation options, leave blank to disable this warning (got '%s')", options.Table)
}
if _, exists := t.indicies[options.Field]; exists {
return fmt.Errorf("There is already an index on '%s'.'%s'", t.name, options.Field)
}
if _, exists := t.constraints[options.Field]; exists {
return fmt.Errorf("There are already constraints on '%s'.'%s'", t.name, options.Field)
}
options.Table = t.name
index, err := newArrayIndex(t, options)
if err != nil {
return err
}
t.indicies[options.Field] = index
t.constraints[options.Field] = index
return nil
}
func (t *table) AddArrayIndexOrPanic(options ArrayIndexOptions) {
if err := t.AddArrayIndex(options); err != nil {
panic(err)
}
}
// Create will insert a record with the next available ID in sequence
func (t *table) Create(thing proto.Message, txs ...*Tx) (uint64, error) {
t.debugLogf("[table.Create] Creating '%s'", t.name)
var id uint64
pv := dbPtrValueOf(thing)
if !pv.IsOfPtrType(t.tPtr) {
return 0, fmt.Errorf("[table.Create] Expected type '%s' in call (got '%s')", t.tPtr.String(), pv.PtrType().String())
}
if err := t.writeTxHelper(func(tx *Tx) error {
var idString []byte
b := t.bucket(tx)
for {
id, _ = b.NextSequence()
idString = []byte(stringy.LiteralUintToString(id))
if b.Get(idString) == nil {
break
}
}
t.debugLogf("[table.Create] New '%s' will have Id '%d'", t.name, id)
pv.dangerous_Field(t.idField).SetUint(id)
if err := t.validate(pv, txs...); err != nil {
return err
}
data, err := pv.Marshal()
if err != nil {
return err
}
b.Put(idString, data)
t.updateIndicies(tx, t.tPtr.Zero(), pv)
t.debugLogf("[table.Create] Created '%s' with Id '%d'", t.name, id)
return nil
}, txs...); err != nil {
return 0, err
}
return id, nil
}
func (t *table) Put(thing proto.Message, txs ...*Tx) error {
t.debugLogf("[table.Put] Putting '%s'", t.name)
pv := dbPtrValueOf(thing)
if !pv.IsOfPtrType(t.tPtr) {
return fmt.Errorf("[table.Put] Expected type '%s' in call (got '%s')", t.tPtr.String(), pv.PtrType().String())
}
id := pv.dangerous_Field(t.idField).Uint()
idString := []byte(stringy.LiteralUintToString(id))
data, err := pv.Marshal()
if err != nil {
return err
}
if err := t.writeTxHelper(func(tx *Tx) error {
b := t.bucket(tx)
old, err := t.getValWithinTx(b, idString)
if err != nil {
return err
}
if err := t.validate(pv, tx); err != nil {
return err
}
if err := b.Put(idString, data); err != nil {
return err
}
t.updateIndicies(tx, old, pv)
return nil
}, txs...); err != nil {
return err
}
return nil
}
func (t *table) CreateOrPanic(thing proto.Message, txs ...*Tx) uint64 {
id, err := t.Create(thing, txs...)
if err != nil {
panic(err)
}
return id
}
func (t *table) Get(id uint64, txs ...*Tx) (proto.Message, error) {
//todo: replace with a query? (once the query engine can optimize .Id = x)
return t.getRaw([]byte(stringy.ToStringOrPanic(id)), txs...)
}
func (t *table) getRaw(id []byte, txs ...*Tx) (vProtoMessage proto.Message, err error) {
return vProtoMessage, t.readTxHelper(func(tx *Tx) error {
vProtoMessage, err = t.getWithinTx(t.bucket(tx), id)
if err != nil {
return err
}
return nil
}, txs...)
}
func (t *table) getValWithinTx(b *bolt.Bucket, id []byte) (dbPtrValue, error) {
t.debugLogf("[table.getValWithinTx] looking up '%s'", id)
//todo: replace with a query? (once the query engine can optimize .Id = x)
v := b.Get([]byte(id))
if v == nil {
t.debugLogf("got nil for '%s'", id)
return t.tPtr.Zero(), nil
}
return t.tPtr.Unmarshal(v)
}
func (t *table) getWithinTx(b *bolt.Bucket, id []byte) (proto.Message, error) {
pv, err := t.getValWithinTx(b, id)
if err != nil {
return nil, err
}
return pv.Proto(), nil
}
func (t *table) Query() Query {
return &queryData{
table: t,
ops: make([]queryOpish, 0),
}
}
func (t *table) Where(fieldName, op string, value interface{}) Query {
return t.Query().Where(fieldName, op, value)
}
func (t *table) Update(id uint64, action func(proto.Message) error, txs ...*Tx) error {
idBytes := []byte(stringy.LiteralUintToString(id))
return t.writeTxHelper(func(tx *Tx) error {
b := t.bucket(tx)
v := b.Get(idBytes)
if v == nil {
t.debugLogf("got nil for '%s'", idBytes)
return fmt.Errorf("No such entry '%d' in table '%s'", id, t.name)
}
original, err := t.tPtr.Unmarshal(v)
if err != nil {
return err
}
updated, err := t.tPtr.Unmarshal(v)
if err != nil {
return err
}
if err := action(updated.Proto()); err != nil {
return err
}
data, err := updated.Marshal()
if err != nil {
return err
}
err = b.Put(idBytes, data)
if err != nil {
return err
}
t.updateIndicies(tx, original, updated)
return nil
}, txs...)
}
func (t *table) iterateRaw(i rawIterator, txs ...*Tx) error {
return t.readTxHelper(func(tx *Tx) error {
c := t.bucket(tx).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
t.debugLogf("iterating over '%s' '%s'", t.name, k)
pv, err := t.tPtr.Unmarshal(v)
if err != nil {
t.debugLogf("[table.iterateRaw] error while iterating over '%s' '%s'", t.name, k)
}
signal, err := i(pv)
if err != nil {
return err
}
if signal == StopIteration {
break
}
}
return nil
}, txs...)
}
func (t *table) Iterate(i Iterator, txs ...*Tx) error {
return t.iterateRaw(func(pv dbPtrValue) (IterationSignal, error) {
return i(pv.Proto())
}, txs...)
}
func (t *table) IterateKeys(i KeyIterator, txs ...*Tx) error {
panic(errors.New("unimplemented"))
}
func (t *table) initialize(tx *Tx) error {
_, err := tx.tx().CreateBucketIfNotExists(t.nameBytes)
for _, index := range t.indicies {
if err := index.initialize(tx); err != nil {
return err
}
}
return err
}
func (t *table) validate(pv dbPtrValue, txs ...*Tx) error {
if pv.IsNil() {
return nil
}
return t.readTxHelper(func(tx *Tx) error {
for _, c := range t.constraints {
if err := c.validate(tx, pv); err != nil {
return err
}
}
return nil
}, txs...)
}
func (t *table) putIndicies(tx *Tx, after dbPtrValue) {
for _, index := range t.indicies {
index.put(tx, after)
}
}
func (t *table) deleteIndicies(tx *Tx, before dbPtrValue) {
for _, index := range t.indicies {
index.delete(tx, before)
}
}
func (t *table) updateIndiciesRaw(tx *Tx, before, after dbPtrValue) {
for _, index := range t.indicies {
index.update(tx, before, after)
}
}
func (t *table) updateIndicies(tx *Tx, before, after dbPtrValue) {
if before.IsNil() {
t.putIndicies(tx, after)
return
}
if after.IsNil() {
t.deleteIndicies(tx, before)
return
}
t.updateIndiciesRaw(tx, before, after)
return
}
func (t *table) ReadTx(ta Transaction) error {
return t.db.ReadTx(ta)
}
func (t *table) readTxHelper(ta Transaction, txs ...*Tx) error {
return t.db.readTxHelper(ta, txs...)
}
func (t *table) WriteTx(ta Transaction) error {
return t.db.WriteTx(ta)
}
func (t *table) writeTxHelper(ta Transaction, txs ...*Tx) error {
return t.db.writeTxHelper(ta, txs...)
}