context.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package context
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "io"
  6. "io/ioutil"
  7. "os"
  8. "path/filepath"
  9. "sync"
  10. "time"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. "github.com/syndtr/goleveldb/leveldb/opt"
  13. )
  14. const (
  15. routesDbFilename = "routes.db"
  16. idLogFilename = "id"
  17. idBatchSize uint64 = 1000
  18. )
  19. // Route ...
  20. type Route struct {
  21. URL string
  22. Time time.Time
  23. }
  24. //
  25. func (o *Route) write(w io.Writer) error {
  26. if err := binary.Write(w, binary.LittleEndian, o.Time.UnixNano()); err != nil {
  27. return err
  28. }
  29. if _, err := w.Write([]byte(o.URL)); err != nil {
  30. return err
  31. }
  32. return nil
  33. }
  34. //
  35. func (o *Route) read(r io.Reader) error {
  36. var t int64
  37. if err := binary.Read(r, binary.LittleEndian, &t); err != nil {
  38. return err
  39. }
  40. b, err := ioutil.ReadAll(r)
  41. if err != nil {
  42. return err
  43. }
  44. o.URL = string(b)
  45. o.Time = time.Unix(0, t)
  46. return nil
  47. }
  48. // Context ...
  49. type Context struct {
  50. path string
  51. db *leveldb.DB
  52. lck sync.Mutex
  53. id uint64
  54. }
  55. // Open ...
  56. func Open(path string) (*Context, error) {
  57. if _, err := os.Stat(path); err != nil {
  58. if err := os.MkdirAll(path, os.ModePerm); err != nil {
  59. return nil, err
  60. }
  61. }
  62. // open the database
  63. db, err := leveldb.OpenFile(filepath.Join(path, routesDbFilename), nil)
  64. if err != nil {
  65. return nil, err
  66. }
  67. c := &Context{
  68. path: path,
  69. db: db,
  70. }
  71. // make sure we have an id log file
  72. if _, err := os.Stat(filepath.Join(path, idLogFilename)); err != nil {
  73. if err := c.commit(idBatchSize); err != nil {
  74. return nil, err
  75. }
  76. }
  77. return c, nil
  78. }
  79. // Get ...
  80. func (c *Context) Get(name string) (*Route, error) {
  81. val, err := c.db.Get([]byte(name), nil)
  82. if err != nil {
  83. return nil, err
  84. }
  85. rt := &Route{}
  86. if err := rt.read(bytes.NewBuffer(val)); err != nil {
  87. return nil, err
  88. }
  89. return rt, nil
  90. }
  91. // Put ...
  92. func (c *Context) Put(key string, rt *Route) error {
  93. var buf bytes.Buffer
  94. if err := rt.write(&buf); err != nil {
  95. return err
  96. }
  97. return c.db.Put([]byte(key), buf.Bytes(), &opt.WriteOptions{Sync: true})
  98. }
  99. func (c *Context) commit(id uint64) error {
  100. w, err := os.Create(filepath.Join(c.path, idLogFilename))
  101. if err != nil {
  102. return err
  103. }
  104. defer w.Close()
  105. if err := binary.Write(w, binary.LittleEndian, id); err != nil {
  106. return err
  107. }
  108. return w.Sync()
  109. }
  110. // NextID ...
  111. func (c *Context) NextID() (uint64, error) {
  112. c.lck.Lock()
  113. defer c.lck.Unlock()
  114. // when we hit a batch boundary, we will commit all ids until the next
  115. // boundary. If we crash, we'll just throw away a batch of ids in the worst
  116. // case.
  117. if c.id%idBatchSize == 0 {
  118. if err := c.commit(c.id + idBatchSize); err != nil {
  119. return 0, err
  120. }
  121. }
  122. c.id++
  123. return c.id, nil
  124. }