Websocket to notify clients about wallet updates
This commit is contained in:
parent
4f97d7761f
commit
4f074b181c
23 changed files with 879 additions and 109 deletions
2
.github/workflows/build-test-release.yml
vendored
2
.github/workflows/build-test-release.yml
vendored
|
@ -12,7 +12,7 @@ jobs:
|
|||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.17
|
||||
go-version: 1.18
|
||||
|
||||
- name: Test
|
||||
run: go test -v ./...
|
||||
|
|
|
@ -10,7 +10,7 @@ Linux Only. Get the latest [release](https://github.com/lbryio/wallet-sync-serve
|
|||
|
||||
Only tried on Linux. Might work for Windows and Mac. No expectations.
|
||||
|
||||
Install Golang, at least version 1.17. (Please report any dependencies we seemed to have forgotten)
|
||||
Install Golang, at least version 1.18. (Please report any dependencies we seemed to have forgotten)
|
||||
|
||||
Check out the repo and run:
|
||||
|
||||
|
|
3
go.mod
3
go.mod
|
@ -1,8 +1,9 @@
|
|||
module lbryio/wallet-sync-server
|
||||
|
||||
go 1.17
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/mailgun/mailgun-go/v4 v4.8.1
|
||||
github.com/mattn/go-sqlite3 v1.14.9
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -47,6 +47,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
|
|
|
@ -15,8 +15,16 @@ var (
|
|||
},
|
||||
[]string{"method"},
|
||||
)
|
||||
ErrorsCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "wallet_sync_error_count",
|
||||
Help: "Total number of various kinds of errors",
|
||||
},
|
||||
[]string{"details"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(RequestsCount)
|
||||
prometheus.MustRegister(ErrorsCount)
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ func TestServerRegisterSuccess(t *testing.T) {
|
|||
}
|
||||
testMail := TestMail{}
|
||||
testAuth := TestAuth{TestNewVerifyTokenString: "abcd1234abcd1234abcd1234abcd1234"}
|
||||
s := Server{&testAuth, testStore, &TestEnv{env}, &testMail, TestPort}
|
||||
s := Init(&testAuth, testStore, &TestEnv{env}, &testMail, TestPort)
|
||||
|
||||
requestBody := []byte(`{"email": "abc@example.com", "password": "12345678", "clientSaltSeed": "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234" }`)
|
||||
|
||||
|
@ -129,7 +129,7 @@ func TestServerRegisterErrors(t *testing.T) {
|
|||
testAuth := TestAuth{TestNewVerifyTokenString: "abcd1234abcd1234abcd1234abcd1234", FailGenToken: tc.failGenToken}
|
||||
testMail := TestMail{SendVerificationEmailError: tc.mailError}
|
||||
testStore := TestStore{Errors: tc.storeErrors}
|
||||
s := Server{&testAuth, &testStore, &TestEnv{env}, &testMail, TestPort}
|
||||
s := Init(&testAuth, &testStore, &TestEnv{env}, &testMail, TestPort)
|
||||
|
||||
// Make request
|
||||
requestBody := fmt.Sprintf(`{"email": "%s", "password": "12345678", "clientSaltSeed": "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234"}`, tc.email)
|
||||
|
@ -227,7 +227,7 @@ func TestServerRegisterAccountVerification(t *testing.T) {
|
|||
testStore := &TestStore{}
|
||||
testAuth := TestAuth{TestNewVerifyTokenString: "abcd1234abcd1234abcd1234abcd1234"}
|
||||
testMail := TestMail{}
|
||||
s := Server{&testAuth, testStore, &TestEnv{tc.env}, &testMail, TestPort}
|
||||
s := Init(&testAuth, testStore, &TestEnv{tc.env}, &testMail, TestPort)
|
||||
|
||||
requestBody := []byte(`{"email": "abc@example.com", "password": "12345678", "clientSaltSeed": "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234" }`)
|
||||
|
||||
|
@ -332,7 +332,7 @@ func TestServerResendVerifyEmailSuccess(t *testing.T) {
|
|||
env := map[string]string{
|
||||
"ACCOUNT_VERIFICATION_MODE": "EmailVerify",
|
||||
}
|
||||
s := Server{&TestAuth{}, &testStore, &TestEnv{env}, &testMail, TestPort}
|
||||
s := Init(&TestAuth{}, &testStore, &TestEnv{env}, &testMail, TestPort)
|
||||
|
||||
requestBody := []byte(`{"email": "abc@example.com"}`)
|
||||
req := httptest.NewRequest(http.MethodPost, paths.PathVerify, bytes.NewBuffer(requestBody))
|
||||
|
@ -429,7 +429,7 @@ func TestServerResendVerifyEmailErrors(t *testing.T) {
|
|||
// Set this up to fail according to specification
|
||||
testStore := TestStore{Errors: tc.storeErrors}
|
||||
testMail := TestMail{SendVerificationEmailError: tc.mailError}
|
||||
s := Server{&TestAuth{}, &testStore, &TestEnv{env}, &testMail, TestPort}
|
||||
s := Init(&TestAuth{}, &testStore, &TestEnv{env}, &testMail, TestPort)
|
||||
|
||||
// Make request
|
||||
var requestBody []byte
|
||||
|
@ -468,7 +468,7 @@ func TestServerResendVerifyEmailErrors(t *testing.T) {
|
|||
|
||||
func TestServerVerifyAccountSuccess(t *testing.T) {
|
||||
testStore := TestStore{}
|
||||
s := Server{&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
s := Init(&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, paths.PathVerify, nil)
|
||||
q := req.URL.Query()
|
||||
|
@ -529,7 +529,7 @@ func TestServerVerifyAccountErrors(t *testing.T) {
|
|||
|
||||
// Set this up to fail according to specification
|
||||
testStore := TestStore{Errors: tc.storeErrors}
|
||||
s := Server{&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
s := Init(&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
|
||||
// Make request
|
||||
req := httptest.NewRequest(http.MethodGet, paths.PathVerify, nil)
|
||||
|
|
|
@ -18,7 +18,7 @@ import (
|
|||
func TestServerAuthHandlerSuccess(t *testing.T) {
|
||||
testAuth := TestAuth{TestNewAuthTokenString: auth.AuthTokenString("seekrit")}
|
||||
testStore := TestStore{}
|
||||
s := Server{&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
s := Init(&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
|
||||
requestBody := []byte(`{"deviceId": "dev-1", "email": "abc@example.com", "password": "12345678"}`)
|
||||
|
||||
|
@ -104,7 +104,7 @@ func TestServerAuthHandlerErrors(t *testing.T) {
|
|||
if tc.authFailGenToken { // TODO - TestAuth{Errors:authErrors}
|
||||
testAuth.FailGenToken = true
|
||||
}
|
||||
server := Server{&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
server := Init(&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
|
||||
// Make request
|
||||
// So long as the JSON is well-formed, the content doesn't matter here since the password check will be stubbed out
|
||||
|
|
|
@ -67,7 +67,7 @@ func TestServerGetClientSalt(t *testing.T) {
|
|||
Errors: tc.storeErrors,
|
||||
}
|
||||
|
||||
s := Server{&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
s := Init(&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, paths.PathClientSaltSeed, nil)
|
||||
q := req.URL.Query()
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"lbryio/wallet-sync-server/auth"
|
||||
"lbryio/wallet-sync-server/server/paths"
|
||||
|
@ -99,7 +100,7 @@ func TestIntegrationWalletUpdates(t *testing.T) {
|
|||
env := map[string]string{
|
||||
"ACCOUNT_WHITELIST": "abc@example.com",
|
||||
}
|
||||
s := Server{&auth.Auth{}, &st, &TestEnv{env}, &TestMail{}, TestPort}
|
||||
s := Init(&auth.Auth{}, &st, &TestEnv{env}, &TestMail{}, TestPort)
|
||||
|
||||
////////////////////
|
||||
t.Log("Request: Register email address - any device")
|
||||
|
@ -269,7 +270,13 @@ func TestIntegrationChangePassword(t *testing.T) {
|
|||
env := map[string]string{
|
||||
"ACCOUNT_WHITELIST": "abc@example.com",
|
||||
}
|
||||
s := Server{&auth.Auth{}, &st, &TestEnv{env}, &TestMail{}, TestPort}
|
||||
s := Init(&auth.Auth{}, &st, &TestEnv{env}, &TestMail{}, TestPort)
|
||||
|
||||
// Still need to mock this until we're doing a real integration test
|
||||
// where we call Serve(), which brings up the real websocket manager.
|
||||
// Note that in the integration test, we're only using this for requests
|
||||
// that would get blocked without it.
|
||||
wsmm := wsMockManager{s: s, done: make(chan bool)}
|
||||
|
||||
////////////////////
|
||||
t.Log("Request: Register email address")
|
||||
|
@ -341,6 +348,9 @@ func TestIntegrationChangePassword(t *testing.T) {
|
|||
t.Log("Request: Change password")
|
||||
////////////////////
|
||||
|
||||
// Giving it a whole second of timeout because this request seems to be a bit
|
||||
// slow.
|
||||
go wsmm.getOneMessage(time.Second)
|
||||
var changePasswordResponse struct{}
|
||||
responseBody, statusCode = request(
|
||||
t,
|
||||
|
@ -350,6 +360,7 @@ func TestIntegrationChangePassword(t *testing.T) {
|
|||
&changePasswordResponse,
|
||||
`{"email": "abc@example.com", "oldPassword": "12345678", "newPassword": "45678901", "clientSaltSeed": "8678def95678def98678def95678def98678def95678def98678def95678def9"}`,
|
||||
)
|
||||
<-wsmm.done
|
||||
|
||||
checkStatusCode(t, statusCode, responseBody)
|
||||
|
||||
|
@ -444,6 +455,9 @@ func TestIntegrationChangePassword(t *testing.T) {
|
|||
t.Log("Request: Change password again, this time including a wallet (since there is a wallet to update)")
|
||||
////////////////////
|
||||
|
||||
// Giving it a whole second of timeout because this request seems to be a bit
|
||||
// slow.
|
||||
go wsmm.getOneMessage(time.Second)
|
||||
responseBody, statusCode = request(
|
||||
t,
|
||||
http.MethodPost,
|
||||
|
@ -460,6 +474,7 @@ func TestIntegrationChangePassword(t *testing.T) {
|
|||
"clientSaltSeed": "0000ffff0000ffff0000ffff0000ffff0000ffff0000ffff0000ffff0000ffff"
|
||||
}`),
|
||||
)
|
||||
<-wsmm.done
|
||||
|
||||
checkStatusCode(t, statusCode, responseBody)
|
||||
|
||||
|
@ -560,7 +575,7 @@ func TestIntegrationVerifyAccount(t *testing.T) {
|
|||
"ACCOUNT_VERIFICATION_MODE": "EmailVerify",
|
||||
}
|
||||
testMail := TestMail{}
|
||||
s := Server{&auth.Auth{}, &st, &TestEnv{env}, &testMail, TestPort}
|
||||
s := Init(&auth.Auth{}, &st, &TestEnv{env}, &testMail, TestPort)
|
||||
|
||||
////////////////////
|
||||
t.Log("Request: Register email address")
|
||||
|
|
|
@ -3,11 +3,16 @@ package server
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"lbryio/wallet-sync-server/auth"
|
||||
"lbryio/wallet-sync-server/metrics"
|
||||
"lbryio/wallet-sync-server/store"
|
||||
"lbryio/wallet-sync-server/wallet"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type ChangePasswordRequest struct {
|
||||
|
@ -69,8 +74,9 @@ func (s *Server) changePassword(w http.ResponseWriter, req *http.Request) {
|
|||
// unverified accounts here for simplicity.
|
||||
|
||||
var err error
|
||||
var userId auth.UserId
|
||||
if changePasswordRequest.EncryptedWallet != "" {
|
||||
err = s.store.ChangePasswordWithWallet(
|
||||
userId, err = s.store.ChangePasswordWithWallet(
|
||||
changePasswordRequest.Email,
|
||||
changePasswordRequest.OldPassword,
|
||||
changePasswordRequest.NewPassword,
|
||||
|
@ -83,7 +89,7 @@ func (s *Server) changePassword(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
} else {
|
||||
err = s.store.ChangePasswordNoWallet(
|
||||
userId, err = s.store.ChangePasswordNoWallet(
|
||||
changePasswordRequest.Email,
|
||||
changePasswordRequest.OldPassword,
|
||||
changePasswordRequest.NewPassword,
|
||||
|
@ -107,6 +113,42 @@ func (s *Server) changePassword(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
// TODO - A socket connection request using an old auth token could still
|
||||
// succeed in a race condition:
|
||||
// * websocket handler: checkAuth passes with token
|
||||
// * password change handler: change password, invalidate token
|
||||
// * password change handler: send userRemove message
|
||||
// * websocket manager: process userRemove message, ending all websocket connections for user
|
||||
// * websocket handler: new websocket connection is established
|
||||
//
|
||||
// It would require the websocket handler to be very slow, but I don't want to
|
||||
// rule it out.
|
||||
//
|
||||
// But a much more likely scenario could happen: the buffer on the userRemove
|
||||
// channel could get full and it could time out, and not boot any of the
|
||||
// users' clients.
|
||||
//
|
||||
// These aren't horribly important now since the only message is a
|
||||
// notification that a new wallet version exists, but who knows what we
|
||||
// could use websockets for. Maybe we start doing something crazy like
|
||||
// updating the wallet over the channel, in which case we absolutely want
|
||||
// to prevent an old client from doing so after a password change on
|
||||
// another client.
|
||||
//
|
||||
// We'd have to think a fair amount about how to make these foolproof if it
|
||||
// becomes important. Maybe we just pass the auth token to the websocket
|
||||
// writer, and pass it to every wallet update db call, and have it check
|
||||
// the auth token within the same transaction as the wallet update.
|
||||
|
||||
timeout := time.NewTicker(100 * time.Millisecond)
|
||||
select {
|
||||
case s.userRemove <- wsClientForUser{userId, nil}:
|
||||
case <-timeout.C:
|
||||
metrics.ErrorsCount.With(prometheus.Labels{"details": "websocket user remove chan buffer full"}).Inc()
|
||||
return
|
||||
}
|
||||
timeout.Stop()
|
||||
|
||||
var changePasswordResponse struct{} // no data to respond with, but keep it JSON
|
||||
var response []byte
|
||||
response, err = json.Marshal(changePasswordResponse)
|
||||
|
@ -118,4 +160,5 @@ func (s *Server) changePassword(w http.ResponseWriter, req *http.Request) {
|
|||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintf(w, string(response))
|
||||
log.Printf("User %s has changed their password", changePasswordRequest.Email)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"lbryio/wallet-sync-server/auth"
|
||||
"lbryio/wallet-sync-server/server/paths"
|
||||
|
@ -25,6 +26,8 @@ func TestServerChangePassword(t *testing.T) {
|
|||
// Whether we expect the call to ChangePassword*Wallet to happen
|
||||
expectChangePasswordCall bool
|
||||
|
||||
expectWsMsg bool
|
||||
|
||||
// `new...` refers to what is being passed into the via POST request (and
|
||||
// what we expect to get passed into SetWallet for the *non-error* cases
|
||||
// below)
|
||||
|
@ -42,6 +45,7 @@ func TestServerChangePassword(t *testing.T) {
|
|||
expectedStatusCode: http.StatusOK,
|
||||
|
||||
expectChangePasswordCall: true,
|
||||
expectWsMsg: true,
|
||||
|
||||
newEncryptedWallet: "my-enc-wallet",
|
||||
newSequence: 2,
|
||||
|
@ -54,6 +58,7 @@ func TestServerChangePassword(t *testing.T) {
|
|||
expectedStatusCode: http.StatusOK,
|
||||
|
||||
expectChangePasswordCall: true,
|
||||
expectWsMsg: true,
|
||||
|
||||
email: "abc@example.com",
|
||||
}, {
|
||||
|
@ -168,8 +173,9 @@ func TestServerChangePassword(t *testing.T) {
|
|||
}
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
testStore := TestStore{Errors: tc.storeErrors}
|
||||
s := Server{&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
testStore := TestStore{Errors: tc.storeErrors, TestUserId: 37}
|
||||
s := Init(&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
wsmm := wsMockManager{s: s, done: make(chan bool)}
|
||||
|
||||
// Whether we passed in wallet fields (these test cases should be passing
|
||||
// in all of them or none of them, so we only test EncryptedWallet). This
|
||||
|
@ -196,7 +202,15 @@ func TestServerChangePassword(t *testing.T) {
|
|||
req := httptest.NewRequest(http.MethodPost, paths.PathPassword, bytes.NewBuffer(requestBody))
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
go wsmm.getOneMessage(100 * time.Millisecond)
|
||||
s.changePassword(w, req)
|
||||
<-wsmm.done
|
||||
if tc.expectWsMsg && wsmm.removedUserId != testStore.TestUserId {
|
||||
t.Error("Expected websocket message to remove user id")
|
||||
}
|
||||
if !tc.expectWsMsg && !wsmm.noMessage {
|
||||
t.Error("Expected no websocket message to remove user id")
|
||||
}
|
||||
|
||||
body, _ := ioutil.ReadAll(w.Body)
|
||||
|
||||
|
|
|
@ -11,6 +11,10 @@ const PathVerify = PathPrefix + "/verify"
|
|||
const PathResendVerify = PathPrefix + "/verify/resend"
|
||||
const PathClientSaltSeed = PathPrefix + "/client-salt-seed"
|
||||
|
||||
// Using such a generic name since, as I understand, we can do a bunch of
|
||||
// different stuff over this one websocket.
|
||||
const PathWebsocket = PathPrefix + "/websocket"
|
||||
|
||||
const PathUnknownEndpoint = PathPrefix + "/"
|
||||
const PathWrongApiVersion = "/api/"
|
||||
|
||||
|
|
102
server/server.go
102
server/server.go
|
@ -1,10 +1,13 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
@ -14,26 +17,55 @@ import (
|
|||
"lbryio/wallet-sync-server/mail"
|
||||
"lbryio/wallet-sync-server/server/paths"
|
||||
"lbryio/wallet-sync-server/store"
|
||||
"lbryio/wallet-sync-server/wallet"
|
||||
)
|
||||
|
||||
const maxBodySize = 100000
|
||||
|
||||
// Message sent from the wallet POST request handler to the websocket manager,
|
||||
// indicating that a user's client should receive a (different) message that
|
||||
// their wallet has an update on the server.
|
||||
type walletUpdateMsg struct {
|
||||
userId auth.UserId
|
||||
sequence wallet.Sequence
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
auth auth.AuthInterface
|
||||
store store.StoreInterface
|
||||
env env.EnvInterface
|
||||
mail mail.MailInterface
|
||||
port int
|
||||
|
||||
clientAdd chan wsClientForUser
|
||||
clientRemove chan wsClientForUser
|
||||
userRemove chan wsClientForUser
|
||||
walletUpdates chan walletUpdateMsg
|
||||
}
|
||||
|
||||
func Init(
|
||||
auth auth.AuthInterface,
|
||||
store store.StoreInterface,
|
||||
env env.EnvInterface,
|
||||
mail mail.MailInterface,
|
||||
authInterface auth.AuthInterface,
|
||||
storeInterface store.StoreInterface,
|
||||
envInterface env.EnvInterface,
|
||||
mailInterface mail.MailInterface,
|
||||
port int,
|
||||
) *Server {
|
||||
return &Server{auth, store, env, mail, port}
|
||||
return &Server{
|
||||
auth: authInterface,
|
||||
store: storeInterface,
|
||||
env: envInterface,
|
||||
mail: mailInterface,
|
||||
port: port,
|
||||
|
||||
// Anything that could get backed up by a lot of requests, let's just
|
||||
// give it a buffer. Starting small until we start to see dashboard
|
||||
// stats on this. I want a sense of how this grows with the number of
|
||||
// users or whatnot.
|
||||
clientAdd: make(chan wsClientForUser),
|
||||
clientRemove: make(chan wsClientForUser),
|
||||
userRemove: make(chan wsClientForUser, 5),
|
||||
walletUpdates: make(chan walletUpdateMsg, 5),
|
||||
}
|
||||
}
|
||||
|
||||
type ErrorResponse struct {
|
||||
|
@ -164,6 +196,22 @@ func (s *Server) checkAuth(
|
|||
return authToken
|
||||
}
|
||||
|
||||
// Useful for any request where token is the only GET param to get
|
||||
// TODO - There's probably a struct-based solution here like with POST/PUT.
|
||||
func getTokenParam(req *http.Request) (token auth.AuthTokenString, err error) {
|
||||
tokenSlice, hasTokenSlice := req.URL.Query()["token"]
|
||||
|
||||
if !hasTokenSlice || tokenSlice[0] == "" {
|
||||
err = fmt.Errorf("Missing token parameter")
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
token = auth.AuthTokenString(tokenSlice[0])
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// TODO - both wallet and token requests should be PUT, not POST.
|
||||
// PUT = "...creates a new resource or replaces a representation of the target resource with the request payload."
|
||||
|
||||
|
@ -177,6 +225,14 @@ func (s *Server) wrongApiVersion(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
func serve(server *http.Server, done chan bool) {
|
||||
log.Print("Server start")
|
||||
server.ListenAndServe()
|
||||
log.Print("Server finish")
|
||||
|
||||
done <- true
|
||||
}
|
||||
|
||||
func (s *Server) Serve() {
|
||||
http.HandleFunc(paths.PathAuthToken, s.getAuthToken)
|
||||
http.HandleFunc(paths.PathWallet, s.handleWallet)
|
||||
|
@ -185,6 +241,7 @@ func (s *Server) Serve() {
|
|||
http.HandleFunc(paths.PathVerify, s.verify)
|
||||
http.HandleFunc(paths.PathResendVerify, s.resendVerifyEmail)
|
||||
http.HandleFunc(paths.PathClientSaltSeed, s.getClientSaltSeed)
|
||||
http.HandleFunc(paths.PathWebsocket, s.websocket)
|
||||
|
||||
http.HandleFunc(paths.PathUnknownEndpoint, s.unknownEndpoint)
|
||||
http.HandleFunc(paths.PathWrongApiVersion, s.wrongApiVersion)
|
||||
|
@ -192,5 +249,38 @@ func (s *Server) Serve() {
|
|||
http.Handle(paths.PathPrometheus, promhttp.Handler())
|
||||
|
||||
log.Printf("Serving at localhost:%d\n", s.port)
|
||||
http.ListenAndServe(fmt.Sprintf("localhost:%d", s.port), nil)
|
||||
|
||||
// Signal *to* socket manager that it should finish (we use server.Shutdown
|
||||
// to tell the server to finish)
|
||||
socketsFinish := make(chan bool)
|
||||
|
||||
// Signal *from* server and socket manager that they are done:
|
||||
serverDone := make(chan bool)
|
||||
socketsDone := make(chan bool)
|
||||
|
||||
go s.manageSockets(socketsDone, socketsFinish)
|
||||
|
||||
server := http.Server{Addr: fmt.Sprintf("localhost:%d", s.port)}
|
||||
go serve(&server, serverDone)
|
||||
|
||||
// Make sure that both the server and the websocket manager close properly on interrupt
|
||||
interrupt := make(chan os.Signal, 1)
|
||||
signal.Notify(interrupt, os.Interrupt)
|
||||
|
||||
// Wait for the interrupt signal
|
||||
<-interrupt
|
||||
|
||||
// Tell the server to finish and wait for it to do so. We want it to finish
|
||||
// to guarantee no more incoming sockets before we turn off the socket
|
||||
// manager.
|
||||
server.Shutdown(context.Background())
|
||||
<-serverDone
|
||||
|
||||
// The socket manager's cleanup procedure assumes that there will be no new
|
||||
// socket connections. Now that the server is done, no new socket
|
||||
// connections will be coming in, so we can close the socket manager.
|
||||
socketsFinish <- true
|
||||
<-socketsDone
|
||||
|
||||
log.Printf("All done")
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"lbryio/wallet-sync-server/auth"
|
||||
"lbryio/wallet-sync-server/server/paths"
|
||||
|
@ -131,6 +132,7 @@ type TestStore struct {
|
|||
Errors TestStoreFunctionsErrors
|
||||
|
||||
TestAuthToken auth.AuthToken
|
||||
TestUserId auth.UserId
|
||||
|
||||
TestEncryptedWallet wallet.EncryptedWallet
|
||||
TestSequence wallet.Sequence
|
||||
|
@ -203,7 +205,7 @@ func (s *TestStore) ChangePasswordWithWallet(
|
|||
encryptedWallet wallet.EncryptedWallet,
|
||||
sequence wallet.Sequence,
|
||||
hmac wallet.WalletHmac,
|
||||
) (err error) {
|
||||
) (auth.UserId, error) {
|
||||
s.Called.ChangePasswordWithWallet = ChangePasswordWithWalletCall{
|
||||
EncryptedWallet: encryptedWallet,
|
||||
Sequence: sequence,
|
||||
|
@ -213,7 +215,7 @@ func (s *TestStore) ChangePasswordWithWallet(
|
|||
NewPassword: newPassword,
|
||||
ClientSaltSeed: clientSaltSeed,
|
||||
}
|
||||
return s.Errors.ChangePasswordWithWallet
|
||||
return s.TestUserId, s.Errors.ChangePasswordWithWallet
|
||||
}
|
||||
|
||||
func (s *TestStore) ChangePasswordNoWallet(
|
||||
|
@ -221,14 +223,14 @@ func (s *TestStore) ChangePasswordNoWallet(
|
|||
oldPassword auth.Password,
|
||||
newPassword auth.Password,
|
||||
clientSaltSeed auth.ClientSaltSeed,
|
||||
) (err error) {
|
||||
) (auth.UserId, error) {
|
||||
s.Called.ChangePasswordNoWallet = ChangePasswordNoWalletCall{
|
||||
Email: email,
|
||||
OldPassword: oldPassword,
|
||||
NewPassword: newPassword,
|
||||
ClientSaltSeed: clientSaltSeed,
|
||||
}
|
||||
return s.Errors.ChangePasswordNoWallet
|
||||
return s.TestUserId, s.Errors.ChangePasswordNoWallet
|
||||
}
|
||||
|
||||
func (s *TestStore) GetClientSaltSeed(email auth.Email) (seed auth.ClientSaltSeed, err error) {
|
||||
|
@ -269,6 +271,35 @@ func expectErrorString(t *testing.T, body []byte, expectedErrorString string) {
|
|||
}
|
||||
}
|
||||
|
||||
type wsMockManager struct {
|
||||
s *Server
|
||||
done chan bool
|
||||
|
||||
addedClientUserId auth.UserId
|
||||
removedClientUserId auth.UserId
|
||||
removedUserId auth.UserId
|
||||
walletUpdateUserId auth.UserId
|
||||
noMessage bool
|
||||
}
|
||||
|
||||
func (m *wsMockManager) getOneMessage(timeout time.Duration) {
|
||||
t := time.NewTicker(timeout)
|
||||
select {
|
||||
case msg := <-m.s.clientAdd:
|
||||
m.addedClientUserId = msg.userId
|
||||
case msg := <-m.s.clientRemove:
|
||||
m.removedClientUserId = msg.userId
|
||||
case msg := <-m.s.userRemove:
|
||||
m.removedUserId = msg.userId
|
||||
case msg := <-m.s.walletUpdates:
|
||||
m.walletUpdateUserId = msg.userId
|
||||
case <-t.C:
|
||||
m.noMessage = true
|
||||
}
|
||||
t.Stop()
|
||||
m.done <- true
|
||||
}
|
||||
|
||||
func TestServerHelperCheckAuth(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
|
@ -324,7 +355,7 @@ func TestServerHelperCheckAuth(t *testing.T) {
|
|||
Errors: tc.storeErrors,
|
||||
TestAuthToken: auth.AuthToken{Token: auth.AuthTokenString("seekrit"), Scope: tc.userScope},
|
||||
}
|
||||
s := Server{&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
s := Init(&TestAuth{}, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
authToken := s.checkAuth(w, testStore.TestAuthToken.Token, tc.requiredScope)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
|
@ -53,22 +54,6 @@ func (s *Server) handleWallet(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO - There's probably a struct-based solution here like with POST/PUT.
|
||||
// We could put that struct up top as well.
|
||||
func getWalletParams(req *http.Request) (token auth.AuthTokenString, err error) {
|
||||
tokenSlice, hasTokenSlice := req.URL.Query()["token"]
|
||||
|
||||
if !hasTokenSlice || tokenSlice[0] == "" {
|
||||
err = fmt.Errorf("Missing token parameter")
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
token = auth.AuthTokenString(tokenSlice[0])
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) getWallet(w http.ResponseWriter, req *http.Request) {
|
||||
metrics.RequestsCount.With(prometheus.Labels{"method": "GET wallet"}).Inc()
|
||||
|
||||
|
@ -76,7 +61,7 @@ func (s *Server) getWallet(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
token, paramsErr := getWalletParams(req)
|
||||
token, paramsErr := getTokenParam(req)
|
||||
|
||||
if paramsErr != nil {
|
||||
// In this specific case, the error is limited to values that are safe to
|
||||
|
@ -160,4 +145,17 @@ func (s *Server) postWallet(w http.ResponseWriter, req *http.Request) {
|
|||
if walletRequest.Sequence == store.InitialWalletSequence {
|
||||
log.Printf("Initial wallet created for user id %d", authToken.UserId)
|
||||
}
|
||||
|
||||
// Inform the other clients over websockets. If we can't do it within 100
|
||||
// milliseconds, don't bother. It's a nice-to-have, not mission critical.
|
||||
// But, count the misses on the dashboard. If it happens a lot we should
|
||||
// probably increase the buffer on the notify chans for the clients. Those
|
||||
// will be a bottleneck within the socket manager.
|
||||
timeout := time.NewTicker(100 * time.Millisecond)
|
||||
select {
|
||||
case s.walletUpdates <- walletUpdateMsg{authToken.UserId, walletRequest.Sequence}:
|
||||
case <-timeout.C:
|
||||
metrics.ErrorsCount.With(prometheus.Labels{"details": "client notify chan buffer full"}).Inc()
|
||||
}
|
||||
timeout.Stop()
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"lbryio/wallet-sync-server/auth"
|
||||
"lbryio/wallet-sync-server/server/paths"
|
||||
|
@ -78,7 +79,7 @@ func TestServerGetWallet(t *testing.T) {
|
|||
}
|
||||
|
||||
testEnv := TestEnv{}
|
||||
s := Server{&testAuth, &testStore, &testEnv, &TestMail{}, TestPort}
|
||||
s := Init(&testAuth, &testStore, &testEnv, &TestMail{}, TestPort)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, paths.PathWallet, nil)
|
||||
q := req.URL.Query()
|
||||
|
@ -135,6 +136,7 @@ func TestServerPostWallet(t *testing.T) {
|
|||
expectedStatusCode int
|
||||
expectedErrorString string
|
||||
expectSetWalletCall bool
|
||||
expectWsMsg bool
|
||||
|
||||
// This is getting messy, but in the case of validation failures, we don't
|
||||
// even get around to trying to get an auth token, since the token string is
|
||||
|
@ -155,6 +157,7 @@ func TestServerPostWallet(t *testing.T) {
|
|||
name: "success",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
expectSetWalletCall: true,
|
||||
expectWsMsg: true,
|
||||
|
||||
// Simulates a situation where the existing sequence is 1, the new
|
||||
// sequence is 2.
|
||||
|
@ -225,18 +228,19 @@ func TestServerPostWallet(t *testing.T) {
|
|||
}
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
testAuth := TestAuth{}
|
||||
testStore := TestStore{
|
||||
TestAuthToken: auth.AuthToken{
|
||||
Token: auth.AuthTokenString("seekrit"),
|
||||
Scope: auth.ScopeFull,
|
||||
Token: auth.AuthTokenString("seekrit"),
|
||||
Scope: auth.ScopeFull,
|
||||
UserId: auth.UserId(37),
|
||||
},
|
||||
|
||||
Errors: tc.storeErrors,
|
||||
}
|
||||
|
||||
s := Server{&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort}
|
||||
s := Init(&testAuth, &testStore, &TestEnv{}, &TestMail{}, TestPort)
|
||||
wsmm := wsMockManager{s: s, done: make(chan bool)}
|
||||
|
||||
requestBody := []byte(
|
||||
fmt.Sprintf(`{
|
||||
|
@ -252,7 +256,15 @@ func TestServerPostWallet(t *testing.T) {
|
|||
|
||||
// test handleWallet while we're at it, which is a dispatch for get and post
|
||||
// wallet
|
||||
go wsmm.getOneMessage(100 * time.Millisecond)
|
||||
s.handleWallet(w, req)
|
||||
<-wsmm.done
|
||||
if tc.expectWsMsg && wsmm.walletUpdateUserId != testStore.TestAuthToken.UserId {
|
||||
t.Error("Expected websocket message to update wallet")
|
||||
}
|
||||
if !tc.expectWsMsg && wsmm.walletUpdateUserId == testStore.TestAuthToken.UserId {
|
||||
t.Error("Expected no websocket message to update wallet")
|
||||
}
|
||||
|
||||
// Make sure we tried to get an auth based on the `token` param (whether or
|
||||
// not it was a valid `token`)
|
||||
|
|
285
server/websocket.go
Normal file
285
server/websocket.go
Normal file
|
@ -0,0 +1,285 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"lbryio/wallet-sync-server/auth"
|
||||
"lbryio/wallet-sync-server/wallet"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// Using this as a guide:
|
||||
// https://github.com/gorilla/websocket/blob/master/examples/chat/
|
||||
//
|
||||
// Skipping some things that seem like maybe overkill for a simple application,
|
||||
// given that this isn't mission critical, and given that I'm not sure what a
|
||||
// lot of it does. In particular the wsWriter ping stuff. But, we can add it if
|
||||
// the performance is bad.
|
||||
|
||||
const pongWait = 60 * time.Second
|
||||
const writeWait = 10 * time.Second
|
||||
|
||||
type wsClientNotifyType int
|
||||
|
||||
const (
|
||||
// The channel is closed (i.e. this is the zero-value), so the socket should
|
||||
// be closed too. We might not actually check for a closed channel using
|
||||
// this value, but it's here for completeness.
|
||||
wsClientNotifyFinish = wsClientNotifyType(iota)
|
||||
|
||||
// Inform the client about a wallet update
|
||||
wsClientNotifyUpdate
|
||||
)
|
||||
|
||||
// wsClientNotifyMsg is sent over wsClient.notify by the websocket manager
|
||||
type wsClientNotifyMsg struct {
|
||||
notifyType wsClientNotifyType
|
||||
sequence wallet.Sequence
|
||||
}
|
||||
|
||||
const notifyChanBuffer = 5 // Each client shouldn't be getting a lot of concurrent messages
|
||||
|
||||
// Given a wsClientNotifyMsg of type wsClientNotifyUpdate, turn it into an
|
||||
// appropriate message to the client to be sent over websocket
|
||||
func walletUpdateWSMessage(msg wsClientNotifyMsg) []byte {
|
||||
return []byte(fmt.Sprintf("wallet-update:%d", msg.sequence))
|
||||
}
|
||||
|
||||
// Poor man's debug log
|
||||
const debugWebsockets = false
|
||||
|
||||
func debugLog(format string, v ...any) {
|
||||
if debugWebsockets {
|
||||
log.Printf(format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Represents a connection to a client.
|
||||
type wsClient struct {
|
||||
socket *websocket.Conn
|
||||
notify chan wsClientNotifyMsg
|
||||
}
|
||||
|
||||
// Each user with at least one actively connected client will have one of these
|
||||
// associated.
|
||||
type wsClientSet map[*wsClient]bool
|
||||
|
||||
// A message sent over a channel to indicate that the given client is
|
||||
// connecting or disconnecting for the given user.
|
||||
type wsClientForUser struct {
|
||||
userId auth.UserId
|
||||
client *wsClient
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{} // use default options
|
||||
|
||||
// Just handle ping/pong
|
||||
func (s *Server) wsReader(userId auth.UserId, client *wsClient) {
|
||||
defer func() {
|
||||
// Since wsWriter is waiting on the notify channel, tell the manager to
|
||||
// close it. This will make wsWriter stop (if it hasn't already).
|
||||
s.clientRemove <- wsClientForUser{userId, client}
|
||||
client.socket.Close()
|
||||
|
||||
debugLog("Done with wsReader %+v", client)
|
||||
}()
|
||||
|
||||
client.socket.SetReadLimit(512)
|
||||
client.socket.SetReadDeadline(time.Now().Add(pongWait))
|
||||
client.socket.SetPongHandler(func(string) error { client.socket.SetReadDeadline(time.Now().Add(pongWait)); return nil })
|
||||
for {
|
||||
_, _, err := client.socket.ReadMessage()
|
||||
if err != nil {
|
||||
debugLog("wsReader: %s\n", err.Error())
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) wsWriter(userId auth.UserId, client *wsClient) {
|
||||
defer func() {
|
||||
// Whatever the cause of closure here, closing the socket (if it's not
|
||||
// closed already) will cause wsReader to stop (if it hasn't stopped
|
||||
// already) since it's waiting on the socket.
|
||||
client.socket.Close()
|
||||
|
||||
debugLog("Done with wsWriter %+v", client)
|
||||
}()
|
||||
|
||||
for notifyMsg := range client.notify {
|
||||
if notifyMsg.notifyType != wsClientNotifyUpdate {
|
||||
log.Printf("wsWriter: Got an unknown message type! %+v", notifyMsg)
|
||||
continue
|
||||
}
|
||||
debugLog("wsWriter: notify update")
|
||||
client.socket.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
err := client.socket.WriteMessage(websocket.TextMessage, walletUpdateWSMessage(notifyMsg))
|
||||
if err != nil {
|
||||
debugLog("wsWriter: %s\n", err.Error())
|
||||
return // skip close message
|
||||
}
|
||||
}
|
||||
|
||||
// Not sure what the point of this is, given that this probably
|
||||
// wouldn't get triggered unless the socket already closed, but the
|
||||
// example did this.
|
||||
debugLog("wsWriter: sending CloseMessage")
|
||||
client.socket.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
client.socket.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
}
|
||||
|
||||
// This is the server endpoint that initiates a new websocket
|
||||
func (s *Server) websocket(w http.ResponseWriter, req *http.Request) {
|
||||
token, paramsErr := getTokenParam(req)
|
||||
|
||||
if paramsErr != nil {
|
||||
// In this specific case, the error is limited to values that are safe to
|
||||
// give to the user.
|
||||
errorJson(w, http.StatusBadRequest, paramsErr.Error())
|
||||
return
|
||||
}
|
||||
|
||||
authToken := s.checkAuth(w, token, auth.ScopeFull)
|
||||
|
||||
if authToken == nil {
|
||||
return
|
||||
}
|
||||
|
||||
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
|
||||
|
||||
ws, err := upgrader.Upgrade(w, req, nil)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
client := wsClient{ws, make(chan wsClientNotifyMsg, notifyChanBuffer)}
|
||||
newClient := wsClientForUser{authToken.UserId, &client}
|
||||
s.clientAdd <- newClient
|
||||
|
||||
go s.wsReader(authToken.UserId, &client)
|
||||
go s.wsWriter(authToken.UserId, &client)
|
||||
|
||||
log.Println("Client Connected")
|
||||
}
|
||||
|
||||
func (s *Server) manageSockets(done chan bool, finish chan bool) {
|
||||
log.Println("Socket manager start")
|
||||
clientsByUser := make(map[auth.UserId]wsClientSet)
|
||||
|
||||
removeClient := func(userId auth.UserId, client *wsClient) {
|
||||
debugLog("removeClient %+v", client)
|
||||
if _, ok := clientsByUser[userId]; !ok {
|
||||
return
|
||||
}
|
||||
if _, ok := clientsByUser[userId][client]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
close(client.notify)
|
||||
delete(clientsByUser[userId], client)
|
||||
|
||||
if len(clientsByUser[userId]) == 0 {
|
||||
delete(clientsByUser, userId)
|
||||
}
|
||||
}
|
||||
|
||||
removeUser := func(userId auth.UserId) {
|
||||
debugLog("removeUser (which calls removeClient) %d", userId)
|
||||
|
||||
for client := range clientsByUser[userId] {
|
||||
removeClient(userId, client)
|
||||
}
|
||||
}
|
||||
|
||||
addClient := func(userId auth.UserId, client *wsClient) {
|
||||
debugLog("addClient %+v", client)
|
||||
if _, ok := clientsByUser[userId]; !ok {
|
||||
clientsByUser[userId] = make(wsClientSet)
|
||||
}
|
||||
clientsByUser[userId][client] = true
|
||||
}
|
||||
|
||||
manage:
|
||||
for {
|
||||
select {
|
||||
case msg := <-s.walletUpdates:
|
||||
for client := range clientsByUser[msg.userId] {
|
||||
select {
|
||||
case client.notify <- wsClientNotifyMsg{wsClientNotifyUpdate, msg.sequence}:
|
||||
default:
|
||||
log.Println("This is a bug: Channel was somehow closed but the manager has not (yet) received a clientRemove message.")
|
||||
|
||||
// The example program had this, but I don't see why.
|
||||
removeClient(msg.userId, client)
|
||||
}
|
||||
}
|
||||
case removedUser := <-s.userRemove:
|
||||
removeUser(removedUser.userId)
|
||||
case retiredClient := <-s.clientRemove:
|
||||
removeClient(retiredClient.userId, retiredClient.client)
|
||||
case newClient := <-s.clientAdd:
|
||||
addClient(newClient.userId, newClient.client)
|
||||
case <-finish:
|
||||
break manage
|
||||
}
|
||||
}
|
||||
|
||||
log.Println("Cleaning up sockets")
|
||||
|
||||
debugLog("Running any addClient messages that snuck in...")
|
||||
|
||||
// By the time the `finish` channel has triggered, the web server has shut
|
||||
// down, so we won't have any clientAdd events _triggered_ by this point.
|
||||
// However, one (or more, if we add a buffer later) may be in the queue, so
|
||||
// let's keep track of them here so we can close them. But we close the
|
||||
// clientAdd channel first so we can break out of this loop.
|
||||
|
||||
// We assume that the server is done writing at this point, thus it's safe
|
||||
// to close this channel here.
|
||||
close(s.clientAdd)
|
||||
for newClient := range s.clientAdd {
|
||||
addClient(newClient.userId, newClient.client)
|
||||
}
|
||||
|
||||
// Now that we know about every running client, just close all of the sockets
|
||||
// (double-closing seems to be safe, so we don't care about races here). But
|
||||
// if it takes more than 10 seconds for whatever reason, just bail.
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
go func() {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
log.Println("Giving up on closing remaining sockets cleanly.")
|
||||
|
||||
// This will signal to main to exit, which will end the program
|
||||
done <- true
|
||||
|
||||
log.Println("Socket manager impolite finish")
|
||||
}
|
||||
}()
|
||||
|
||||
debugLog("Closing sockets...")
|
||||
for _, userClients := range clientsByUser {
|
||||
for client := range userClients {
|
||||
debugLog("Closing socket for %+v", client)
|
||||
client.socket.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
client.socket.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
// TODO - wait for receiving the CloseMessage?
|
||||
client.socket.Close()
|
||||
debugLog("Closed socket for %+v", client)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO - Do we need to wait for the sockets to actually close after
|
||||
// calling Close(), before exiting the program? Alternately: do they close
|
||||
// automatically anyway and I don't need this cleanup stuff in the first
|
||||
// place? (Probably doesn't automatically send the Close message at least.)
|
||||
|
||||
done <- true
|
||||
log.Println("Socket manager finish")
|
||||
}
|
35
server/websocket_test.go
Normal file
35
server/websocket_test.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestWebsocketManagerQuits(t *testing.T) {
|
||||
s := Init(&TestAuth{}, &TestStore{}, &TestEnv{}, &TestMail{}, TestPort)
|
||||
done := make(chan bool)
|
||||
finish := make(chan bool)
|
||||
|
||||
go s.manageSockets(done, finish)
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatal("Websocket handler shouldn't be done yet")
|
||||
default:
|
||||
}
|
||||
|
||||
finish <- true
|
||||
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
select {
|
||||
case <-done:
|
||||
case <-ticker.C:
|
||||
t.Fatal("Websocket handler should be done by now")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TODO Add some real tests. Making a meaningful test, given that we're dealing
|
||||
// with websockets here, is a real pain in the ass, and it's probably not the
|
||||
// highest priority right now. If websockets become higher profile we can work
|
||||
// on it again.
|
|
@ -43,9 +43,13 @@ func TestStoreChangePasswordSuccess(t *testing.T) {
|
|||
|
||||
lowerEmail := auth.Email(strings.ToLower(string(email)))
|
||||
|
||||
if err := s.ChangePasswordWithWallet(lowerEmail, oldPassword, newPassword, newSeed, encryptedWallet, sequence, hmac); err != nil {
|
||||
pwUserId, err := s.ChangePasswordWithWallet(lowerEmail, oldPassword, newPassword, newSeed, encryptedWallet, sequence, hmac)
|
||||
if err != nil {
|
||||
t.Errorf("ChangePasswordWithWallet (lower case email): unexpected error: %+v", err)
|
||||
}
|
||||
if userId != pwUserId {
|
||||
t.Errorf("Expected ChangePasswordWithWallet to return correct user Id. Want %d got %d", userId, pwUserId)
|
||||
}
|
||||
|
||||
expectAccountMatch(t, &s, email.Normalize(), email, newPassword, newSeed, nil, nil, time.Now().UTC(), time.Now().UTC())
|
||||
expectWalletExists(t, &s, userId, encryptedWallet, sequence, hmac, time.Now().UTC())
|
||||
|
@ -59,9 +63,13 @@ func TestStoreChangePasswordSuccess(t *testing.T) {
|
|||
|
||||
upperEmail := auth.Email(strings.ToUpper(string(email)))
|
||||
|
||||
if err := s.ChangePasswordWithWallet(upperEmail, newPassword, newNewPassword, newNewSeed, newEncryptedWallet, newSequence, newHmac); err != nil {
|
||||
pwUserId, err = s.ChangePasswordWithWallet(upperEmail, newPassword, newNewPassword, newNewSeed, newEncryptedWallet, newSequence, newHmac)
|
||||
if err != nil {
|
||||
t.Errorf("ChangePasswordWithWallet (upper case email): unexpected error: %+v", err)
|
||||
}
|
||||
if userId != pwUserId {
|
||||
t.Errorf("Expected ChangePasswordWithWallet to return correct user Id. Want %d got %d", userId, pwUserId)
|
||||
}
|
||||
|
||||
expectAccountMatch(t, &s, email.Normalize(), email, newNewPassword, newNewSeed, nil, nil, time.Now().UTC(), time.Now().UTC())
|
||||
}
|
||||
|
@ -165,7 +173,7 @@ func TestStoreChangePasswordErrors(t *testing.T) {
|
|||
newPassword := oldPassword + auth.Password("_new") // Make the new password different (as it should be)
|
||||
newSeed := auth.ClientSaltSeed("edf98765edf98765edf98765edf98765edf98765edf98765edf98765edf98765")
|
||||
|
||||
if err := s.ChangePasswordWithWallet(submittedEmail, submittedOldPassword, newPassword, newSeed, newEncryptedWallet, tc.sequence, newHmac); err != tc.expectedError {
|
||||
if _, err := s.ChangePasswordWithWallet(submittedEmail, submittedOldPassword, newPassword, newSeed, newEncryptedWallet, tc.sequence, newHmac); err != tc.expectedError {
|
||||
t.Errorf("ChangePasswordWithWallet: unexpected value for err. want: %+v, got: %+v", tc.expectedError, err)
|
||||
}
|
||||
|
||||
|
@ -204,9 +212,13 @@ func TestStoreChangePasswordNoWalletSuccess(t *testing.T) {
|
|||
|
||||
lowerEmail := auth.Email(strings.ToLower(string(email)))
|
||||
|
||||
if err := s.ChangePasswordNoWallet(lowerEmail, oldPassword, newPassword, newSeed); err != nil {
|
||||
pwUserId, err := s.ChangePasswordNoWallet(lowerEmail, oldPassword, newPassword, newSeed)
|
||||
if err != nil {
|
||||
t.Errorf("ChangePasswordNoWallet (lower case email): unexpected error: %+v", err)
|
||||
}
|
||||
if userId != pwUserId {
|
||||
t.Errorf("Expected ChangePasswordNoWallet to return correct user Id. Want %d got %d", userId, pwUserId)
|
||||
}
|
||||
|
||||
expectAccountMatch(t, &s, email.Normalize(), email, newPassword, newSeed, nil, nil, time.Now().UTC(), time.Now().UTC())
|
||||
expectWalletNotExists(t, &s, userId)
|
||||
|
@ -217,9 +229,14 @@ func TestStoreChangePasswordNoWalletSuccess(t *testing.T) {
|
|||
|
||||
upperEmail := auth.Email(strings.ToUpper(string(email)))
|
||||
|
||||
if err := s.ChangePasswordNoWallet(upperEmail, newPassword, newNewPassword, newNewSeed); err != nil {
|
||||
pwUserId, err = s.ChangePasswordNoWallet(upperEmail, newPassword, newNewPassword, newNewSeed)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("ChangePasswordNoWallet (upper case email): unexpected error: %+v", err)
|
||||
}
|
||||
if userId != pwUserId {
|
||||
t.Errorf("Expected ChangePasswordNoWallet to return correct user Id. Want %d got %d", userId, pwUserId)
|
||||
}
|
||||
|
||||
expectAccountMatch(t, &s, email.Normalize(), email, newNewPassword, newNewSeed, nil, nil, time.Now().UTC(), time.Now().UTC())
|
||||
}
|
||||
|
@ -308,7 +325,7 @@ func TestStoreChangePasswordNoWalletErrors(t *testing.T) {
|
|||
newPassword := oldPassword + auth.Password("_new") // Possibly make the new password different (as it should be)
|
||||
newSeed := auth.ClientSaltSeed("edf98765edf98765edf98765edf98765edf98765edf98765edf98765edf98765")
|
||||
|
||||
if err := s.ChangePasswordNoWallet(submittedEmail, submittedOldPassword, newPassword, newSeed); err != tc.expectedError {
|
||||
if _, err := s.ChangePasswordNoWallet(submittedEmail, submittedOldPassword, newPassword, newSeed); err != tc.expectedError {
|
||||
t.Errorf("ChangePasswordNoWallet: unexpected value for err. want: %+v, got: %+v", tc.expectedError, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -53,8 +53,8 @@ type StoreInterface interface {
|
|||
CreateAccount(auth.Email, auth.Password, auth.ClientSaltSeed, *auth.VerifyTokenString) error
|
||||
UpdateVerifyTokenString(auth.Email, auth.VerifyTokenString) error
|
||||
VerifyAccount(auth.VerifyTokenString) error
|
||||
ChangePasswordWithWallet(auth.Email, auth.Password, auth.Password, auth.ClientSaltSeed, wallet.EncryptedWallet, wallet.Sequence, wallet.WalletHmac) error
|
||||
ChangePasswordNoWallet(auth.Email, auth.Password, auth.Password, auth.ClientSaltSeed) error
|
||||
ChangePasswordWithWallet(auth.Email, auth.Password, auth.Password, auth.ClientSaltSeed, wallet.EncryptedWallet, wallet.Sequence, wallet.WalletHmac) (auth.UserId, error)
|
||||
ChangePasswordNoWallet(auth.Email, auth.Password, auth.Password, auth.ClientSaltSeed) (auth.UserId, error)
|
||||
GetClientSaltSeed(auth.Email) (auth.ClientSaltSeed, error)
|
||||
}
|
||||
|
||||
|
@ -492,6 +492,22 @@ func (s *Store) VerifyAccount(verifyTokenString auth.VerifyTokenString) (err err
|
|||
// Also delete all auth tokens to force clients to update their root password
|
||||
// to get a new token. This prevents other clients from posting a wallet
|
||||
// encrypted with the old key.
|
||||
//
|
||||
// Return userId as a pure convenience for the calling request handler.
|
||||
//
|
||||
// TODO - A wallet encrypted with the old key could still save successfully in
|
||||
// a race condition:
|
||||
// 1) get auth token request passes old password check
|
||||
// 2) password change transaction begins and ends
|
||||
// 3) get auth token request saves and returns a new token
|
||||
// 4) post wallet using the auth token that snuck by
|
||||
// One obvious solution would be to integrate everything into one database
|
||||
// transaction. This problem could apply to other requests as well. Not just
|
||||
// database ones: there's a similar potential race condition trying to boot
|
||||
// users from all of their websockets on password change. We should think
|
||||
// about it. Maybe we could have a counter for password changes, similar to
|
||||
// Sequence? And the tokens have that number attached to it. We can check it
|
||||
// as an extra validation of the token.
|
||||
func (s *Store) ChangePasswordWithWallet(
|
||||
email auth.Email,
|
||||
oldPassword auth.Password,
|
||||
|
@ -500,7 +516,7 @@ func (s *Store) ChangePasswordWithWallet(
|
|||
encryptedWallet wallet.EncryptedWallet,
|
||||
sequence wallet.Sequence,
|
||||
hmac wallet.WalletHmac,
|
||||
) (err error) {
|
||||
) (userId auth.UserId, err error) {
|
||||
return s.changePassword(
|
||||
email,
|
||||
oldPassword,
|
||||
|
@ -518,12 +534,14 @@ func (s *Store) ChangePasswordWithWallet(
|
|||
// Also delete all auth tokens to force clients to update their root password
|
||||
// to get a new token. This prevents other clients from posting a wallet
|
||||
// encrypted with the old key.
|
||||
//
|
||||
// Return userId as a pure convenience for the calling request handler.
|
||||
func (s *Store) ChangePasswordNoWallet(
|
||||
email auth.Email,
|
||||
oldPassword auth.Password,
|
||||
newPassword auth.Password,
|
||||
clientSaltSeed auth.ClientSaltSeed,
|
||||
) (err error) {
|
||||
) (userId auth.UserId, err error) {
|
||||
return s.changePassword(
|
||||
email,
|
||||
oldPassword,
|
||||
|
@ -544,8 +562,7 @@ func (s *Store) changePassword(
|
|||
encryptedWallet wallet.EncryptedWallet,
|
||||
sequence wallet.Sequence,
|
||||
hmac wallet.WalletHmac,
|
||||
) (err error) {
|
||||
var userId auth.UserId
|
||||
) (userId auth.UserId, err error) {
|
||||
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
|
@ -646,8 +663,10 @@ func (s *Store) changePassword(
|
|||
}
|
||||
}
|
||||
|
||||
// Don't care how many I delete here. Might even be zero. No login token while
|
||||
// changing password seems plausible.
|
||||
// Don't care how many I delete here. Might even be zero (no login token
|
||||
// while changing password seems plausible). The main reason for this is
|
||||
// that we want to prevent any client from saving a subsequent wallet
|
||||
// without changing its password first.
|
||||
_, err = tx.Exec("DELETE FROM auth_tokens WHERE user_id=?", userId)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -10,9 +10,11 @@ For this example we will be working with a locally running server so that we don
|
|||
|
||||
```
|
||||
>>> from test_client import Client
|
||||
>>> c1 = Client("joe2@example.com", "123abc2", 'test_wallet_1', local=True)
|
||||
>>> import time
|
||||
>>> email = "joe-%s@example.com" % int(time.time())
|
||||
>>> c1 = Client("c1", email, "123abc2", 'test_wallet_1', local=True)
|
||||
Connecting to Wallet API at http://localhost:8090
|
||||
>>> c2 = Client("joe2@example.com", "123abc2", 'test_wallet_2', local=True)
|
||||
>>> c2 = Client("c2", email, "123abc2", 'test_wallet_2', local=True)
|
||||
Connecting to Wallet API at http://localhost:8090
|
||||
```
|
||||
|
||||
|
@ -24,7 +26,7 @@ Generating keys...
|
|||
Done generating keys
|
||||
Registered
|
||||
>>> c1.salt_seed
|
||||
'1d52635c14b34f0fefcf86368d4e0b82e3555de9d3c93a6f22cd5500fd120c0d'
|
||||
'09f355b525ad440c9fdd0e95e5ac2c49708f84112772ea322c28311c53d55410'
|
||||
```
|
||||
|
||||
Set up the other client. See that it got the same salt seed from the server in the process, which it needs to make sure we have the correct encryption key and login password.
|
||||
|
@ -34,16 +36,16 @@ Set up the other client. See that it got the same salt seed from the server in t
|
|||
Generating keys...
|
||||
Done generating keys
|
||||
>>> c2.salt_seed
|
||||
'1d52635c14b34f0fefcf86368d4e0b82e3555de9d3c93a6f22cd5500fd120c0d'
|
||||
'09f355b525ad440c9fdd0e95e5ac2c49708f84112772ea322c28311c53d55410'
|
||||
```
|
||||
|
||||
Now that the account exists, grab an auth token with both clients.
|
||||
|
||||
```
|
||||
>>> c1.get_auth_token()
|
||||
Got auth token: e52f6e893fe3fa92d677d85f32e77357d68afd313c303a91d3af176ec684aa0d
|
||||
Got auth token: be540cea8c18d9de40255ab8da4cbc6d561512731f38e36e808903234453ba4d
|
||||
>>> c2.get_auth_token()
|
||||
Got auth token: b9fc2620990447d5f0305ecafc9f75e2a5f928a31bd86806aa8989567cad57d0
|
||||
Got auth token: 4cde9acc071e25a4fdebcd5a019d84d4f38c7093e4d15ddda5282d0ce136278d
|
||||
```
|
||||
|
||||
## Syncing
|
||||
|
@ -55,7 +57,7 @@ Create a new wallet + metadata (we'll wrap it in a struct we'll call `WalletStat
|
|||
>>> c1.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=1, encrypted_wallet='czo4MTkyOjE2OjE6XBEQgEACPvxgUFW3MGnY9tG5VYh/Hx7iNG6DAX+q4zTbVZM17OQ/5D1+IOjxS7jxOB+dZmtxmo6qwGtizjc4+YBhNk/eKb+uIU8T6HQ4T3m+PiWpedLnBwF4RStPPBp1M2WNFTIZQPKirETPO3GqRQSzveB17A3iESqYTqHnGeE=')
|
||||
WalletState(sequence=1, encrypted_wallet='czo4MTkyOjE2OjE6FkHI5eVQvbw7vEouMyn766uHhWJBskmh7ZWNttSHh6Ro0ETPZXXuIuPMM+9/1e6Maq+CFBToS2mI/7Z6fuGdVRAGLXVNZKXY9g48J9R7TikFas/XE1Vs/bKLKGKAgvlhJCYvzg582z7JrMzaAQ26VJwFL0gbx+ey+wo7ukb5Yz4=')
|
||||
'Success'
|
||||
```
|
||||
|
||||
|
@ -67,7 +69,7 @@ Now, call `init_wallet_state` with the other client. Then, we call `get_remote_w
|
|||
>>> c2.init_wallet_state()
|
||||
>>> c2.get_remote_wallet()
|
||||
Got latest walletState:
|
||||
WalletState(sequence=1, encrypted_wallet='czo4MTkyOjE2OjE6XBEQgEACPvxgUFW3MGnY9tG5VYh/Hx7iNG6DAX+q4zTbVZM17OQ/5D1+IOjxS7jxOB+dZmtxmo6qwGtizjc4+YBhNk/eKb+uIU8T6HQ4T3m+PiWpedLnBwF4RStPPBp1M2WNFTIZQPKirETPO3GqRQSzveB17A3iESqYTqHnGeE=')
|
||||
WalletState(sequence=1, encrypted_wallet='czo4MTkyOjE2OjE6FkHI5eVQvbw7vEouMyn766uHhWJBskmh7ZWNttSHh6Ro0ETPZXXuIuPMM+9/1e6Maq+CFBToS2mI/7Z6fuGdVRAGLXVNZKXY9g48J9R7TikFas/XE1Vs/bKLKGKAgvlhJCYvzg582z7JrMzaAQ26VJwFL0gbx+ey+wo7ukb5Yz4=')
|
||||
'Success'
|
||||
```
|
||||
|
||||
|
@ -79,12 +81,12 @@ Push a new version, GET it with the other client. Even though we haven't edited
|
|||
>>> c2.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=2, encrypted_wallet='czo4MTkyOjE2OjE6gL9aGNjy4U+6mBQZRzx+GS+/1dhl54+5sBzVtBQz51az7HQ3HFI2PjUL7XkeTcjdsaPEKh3eFTQwly9fNFKJIya5YvmtY8zhxe8FCqCkTITrn2EPwZFYXF6E3Wi1gLaPMpZlb2EXIZ1E7Gbg1Uxcpj+s1CB4ttjIZdnFwUrfAw4=')
|
||||
WalletState(sequence=2, encrypted_wallet='czo4MTkyOjE2OjE6c1dN5L/ywTpgtRIs35zwdPubo+rkXiuk6yoQdw63c/pW+X2NWuFhLo0sfITnRvjiyK20fIwGmBK2nnp2BllEV41HUHETitLL+HS5KK8+/ReNaqE/bZQs1pDn7K+RBl14heAPQQXLeFNFp9fNZxPGs1N+uLl0sRnhij+8kjS5p0Q=')
|
||||
'Success'
|
||||
>>> c1.get_remote_wallet()
|
||||
Nothing to merge. Taking remote walletState as latest walletState.
|
||||
Got latest walletState:
|
||||
WalletState(sequence=2, encrypted_wallet='czo4MTkyOjE2OjE6gL9aGNjy4U+6mBQZRzx+GS+/1dhl54+5sBzVtBQz51az7HQ3HFI2PjUL7XkeTcjdsaPEKh3eFTQwly9fNFKJIya5YvmtY8zhxe8FCqCkTITrn2EPwZFYXF6E3Wi1gLaPMpZlb2EXIZ1E7Gbg1Uxcpj+s1CB4ttjIZdnFwUrfAw4=')
|
||||
WalletState(sequence=2, encrypted_wallet='czo4MTkyOjE2OjE6c1dN5L/ywTpgtRIs35zwdPubo+rkXiuk6yoQdw63c/pW+X2NWuFhLo0sfITnRvjiyK20fIwGmBK2nnp2BllEV41HUHETitLL+HS5KK8+/ReNaqE/bZQs1pDn7K+RBl14heAPQQXLeFNFp9fNZxPGs1N+uLl0sRnhij+8kjS5p0Q=')
|
||||
'Success'
|
||||
```
|
||||
|
||||
|
@ -110,12 +112,12 @@ The wallet is synced between the clients. The client with the changed preference
|
|||
>>> c1.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=3, encrypted_wallet='czo4MTkyOjE2OjE6JwVggGDjqoLy9YUqFXzIltph5bvO46SwJoAbLydlLg1mjfoXksGm9NsWbmYYmBoiXmiIbJPIsj8xfOjO5JlCH+EHSdyjCXizzwClYwgM4UD1+/ltuv1TH7H59cXd6Kztefn4y9IL/97rs+2DxDHM6cb/AdYGohIc3VaCmYBSbYRQFjTbQHaaScW6ntYuXAyE')
|
||||
WalletState(sequence=3, encrypted_wallet='czo4MTkyOjE2OjE6utvO9D4f/H8EAcRi1RrsxwnF556Mb/nE77yD98i2Ml75Y0FynfjJYho05n2l4RZDirV5Bm4KrXLH3D+qZh3hqW+ejTvMTUYJ2pLv9jNx7ZuNtPCF6tn+LaRur9sfy+jyuxmeHR9Wd7rqimg5pIBed2DLUTA5R5QaG7TEiMYTrUN+RrkYFdANEvh8LOgRDpce')
|
||||
'Success'
|
||||
>>> c2.get_remote_wallet()
|
||||
Nothing to merge. Taking remote walletState as latest walletState.
|
||||
Got latest walletState:
|
||||
WalletState(sequence=3, encrypted_wallet='czo4MTkyOjE2OjE6JwVggGDjqoLy9YUqFXzIltph5bvO46SwJoAbLydlLg1mjfoXksGm9NsWbmYYmBoiXmiIbJPIsj8xfOjO5JlCH+EHSdyjCXizzwClYwgM4UD1+/ltuv1TH7H59cXd6Kztefn4y9IL/97rs+2DxDHM6cb/AdYGohIc3VaCmYBSbYRQFjTbQHaaScW6ntYuXAyE')
|
||||
WalletState(sequence=3, encrypted_wallet='czo4MTkyOjE2OjE6utvO9D4f/H8EAcRi1RrsxwnF556Mb/nE77yD98i2Ml75Y0FynfjJYho05n2l4RZDirV5Bm4KrXLH3D+qZh3hqW+ejTvMTUYJ2pLv9jNx7ZuNtPCF6tn+LaRur9sfy+jyuxmeHR9Wd7rqimg5pIBed2DLUTA5R5QaG7TEiMYTrUN+RrkYFdANEvh8LOgRDpce')
|
||||
'Success'
|
||||
>>> c2.get_preferences()
|
||||
{'animal': 'cow', 'car': ''}
|
||||
|
@ -142,7 +144,7 @@ One client POSTs its change first.
|
|||
>>> c1.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=4, encrypted_wallet='czo4MTkyOjE2OjE6xMKOvjQ9RBAWCac5Cj5d30YSI4PaMh3T+99fLdHKJC2RCcwrbhCurNIDBln6QJWCfa3gRp2/sY9k47XwZNsknCTrdIe4c3YJejvL/WCZTzoJ81m9QGbP/05DHQUV5c7z30taIESp4qOFwpSwYMB972gn6ZXOhn1iNDKSCLN3nSLHFnA0arjCAPQof//lJriz')
|
||||
WalletState(sequence=4, encrypted_wallet='czo4MTkyOjE2OjE65B/Ia0+1qF1fKqsAfTcf4tx6tJK95yCGcuCVRNk83URaksesq+qI/pqtPX2NOwX6y3KJhO75McHI1Wg5AJx3NkA/jV6VeFVHnAJu0UpkzqI1RtTMEn3w63VXQzLmmj+8PeEEdbY5f5/59fuUhTKfrPLTbr8dSvKIsKQC36d17oGhI/Ox8eQqS5QdjeoARENT')
|
||||
'Success'
|
||||
```
|
||||
|
||||
|
@ -154,7 +156,7 @@ Eventually, the client will be responsible (or at least more responsible) for me
|
|||
>>> c2.get_remote_wallet()
|
||||
Merging local changes with remote changes to create latest walletState.
|
||||
Got latest walletState:
|
||||
WalletState(sequence=4, encrypted_wallet='czo4MTkyOjE2OjE6xMKOvjQ9RBAWCac5Cj5d30YSI4PaMh3T+99fLdHKJC2RCcwrbhCurNIDBln6QJWCfa3gRp2/sY9k47XwZNsknCTrdIe4c3YJejvL/WCZTzoJ81m9QGbP/05DHQUV5c7z30taIESp4qOFwpSwYMB972gn6ZXOhn1iNDKSCLN3nSLHFnA0arjCAPQof//lJriz')
|
||||
WalletState(sequence=4, encrypted_wallet='czo4MTkyOjE2OjE65B/Ia0+1qF1fKqsAfTcf4tx6tJK95yCGcuCVRNk83URaksesq+qI/pqtPX2NOwX6y3KJhO75McHI1Wg5AJx3NkA/jV6VeFVHnAJu0UpkzqI1RtTMEn3w63VXQzLmmj+8PeEEdbY5f5/59fuUhTKfrPLTbr8dSvKIsKQC36d17oGhI/Ox8eQqS5QdjeoARENT')
|
||||
'Success'
|
||||
>>> c2.get_preferences()
|
||||
{'animal': 'horse', 'car': 'Audi'}
|
||||
|
@ -166,12 +168,12 @@ Finally, the client with the merged wallet pushes it to the server, and the othe
|
|||
>>> c2.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=5, encrypted_wallet='czo4MTkyOjE2OjE6L+PCpF1qh1ayai/fqnc7kwa2eBJc1n6L6FuaLps8gdZhY9UdaBMc/BckvgUF9OXR7yOvndrFy73+5EzWxpmffBfZGqq42XjtbmHGScEERjuzra8UB2vLn+N2oe5s+e2O+7lJxPKYBD2pX4xKm3HjKqAso+D0MsWHMz9hqRLFekJfv5pVglUVkweW+h8yNxn1')
|
||||
WalletState(sequence=5, encrypted_wallet='czo4MTkyOjE2OjE6p7gs7r4Q6ky7/93vf/1e3HPXTKSAPP6c+0oxYl7010WKqkT0m9uVzU/YZW2QdCrPO4M99XiuxjpnbKwl42BnrvIkpfO1JHaDQHYTVZjp5W7UZHeevR8zQiyXWpEIDdTt1dwwKvpP7RHN1RqJzbyTCfJ6YqA7/GDiAojfDyz/5CvsIxcMCI+vTgYDwcy4rP36')
|
||||
'Success'
|
||||
>>> c1.get_remote_wallet()
|
||||
Nothing to merge. Taking remote walletState as latest walletState.
|
||||
Got latest walletState:
|
||||
WalletState(sequence=5, encrypted_wallet='czo4MTkyOjE2OjE6L+PCpF1qh1ayai/fqnc7kwa2eBJc1n6L6FuaLps8gdZhY9UdaBMc/BckvgUF9OXR7yOvndrFy73+5EzWxpmffBfZGqq42XjtbmHGScEERjuzra8UB2vLn+N2oe5s+e2O+7lJxPKYBD2pX4xKm3HjKqAso+D0MsWHMz9hqRLFekJfv5pVglUVkweW+h8yNxn1')
|
||||
WalletState(sequence=5, encrypted_wallet='czo4MTkyOjE2OjE6p7gs7r4Q6ky7/93vf/1e3HPXTKSAPP6c+0oxYl7010WKqkT0m9uVzU/YZW2QdCrPO4M99XiuxjpnbKwl42BnrvIkpfO1JHaDQHYTVZjp5W7UZHeevR8zQiyXWpEIDdTt1dwwKvpP7RHN1RqJzbyTCfJ6YqA7/GDiAojfDyz/5CvsIxcMCI+vTgYDwcy4rP36')
|
||||
'Success'
|
||||
>>> c1.get_preferences()
|
||||
{'animal': 'horse', 'car': 'Audi'}
|
||||
|
@ -202,7 +204,7 @@ We try to POST both of them to the server. The second one fails because of the c
|
|||
>>> c2.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=6, encrypted_wallet='czo4MTkyOjE2OjE6HieQoVznUMTBF6x643Mg/AQUZadaikkiuRZsw3IaQsapK56WL3IBGrlemOjSH6uTfBWsWaLDMXEz+X7j5wqchSAt/wle2+I9dKgyDdFhWMOaEd61pT6r+lS8O8AbSKUJ6r5FSDgJRE/vz5l4xP/W9AVrK4l0u9ZqpvsKAet3UlfVV48cOnhwgPqlPoGBQ1xF')
|
||||
WalletState(sequence=6, encrypted_wallet='czo4MTkyOjE2OjE6JFuPERZ3SB2CiSzPISB5/KteOvn1348Xk1o+sgFQMO8l1woKG7pRFUH89L4cm9q/521lQsXgujwEW30X8XQnoTEC0VzaM7IWI7yug7uRQ1XhpXd5W97L3tTlAnZ+V+/lg76Auk9COkJYrbQ8raon3qNl0np9A8pQLvbxVTYAvoA8AMzMEUBFeuZoADZX6bJI')
|
||||
'Success'
|
||||
>>> c1.update_remote_wallet()
|
||||
Submitted wallet is out of date.
|
||||
|
@ -218,14 +220,14 @@ The client that is out of date will then call `get_remote_wallet`, which GETs an
|
|||
>>> c1.get_remote_wallet()
|
||||
Merging local changes with remote changes to create latest walletState.
|
||||
Got latest walletState:
|
||||
WalletState(sequence=6, encrypted_wallet='czo4MTkyOjE2OjE6HieQoVznUMTBF6x643Mg/AQUZadaikkiuRZsw3IaQsapK56WL3IBGrlemOjSH6uTfBWsWaLDMXEz+X7j5wqchSAt/wle2+I9dKgyDdFhWMOaEd61pT6r+lS8O8AbSKUJ6r5FSDgJRE/vz5l4xP/W9AVrK4l0u9ZqpvsKAet3UlfVV48cOnhwgPqlPoGBQ1xF')
|
||||
WalletState(sequence=6, encrypted_wallet='czo4MTkyOjE2OjE6JFuPERZ3SB2CiSzPISB5/KteOvn1348Xk1o+sgFQMO8l1woKG7pRFUH89L4cm9q/521lQsXgujwEW30X8XQnoTEC0VzaM7IWI7yug7uRQ1XhpXd5W97L3tTlAnZ+V+/lg76Auk9COkJYrbQ8raon3qNl0np9A8pQLvbxVTYAvoA8AMzMEUBFeuZoADZX6bJI')
|
||||
'Success'
|
||||
>>> c1.get_preferences()
|
||||
{'animal': 'beaver', 'car': 'Toyota'}
|
||||
>>> c1.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=7, encrypted_wallet='czo4MTkyOjE2OjE6ypuX2e/wjiVZZVrLQEwoEuHZH7xhs6B3/awzxH/5WZITlKOo7TvV2Mjke/MSdTk2/YyWhfN8U0e4IwGxKW9VIpnF3ElEtEZxvJBklzDXeDNh5pWMgeZkBH5EempDQ6VzT0206z89EeiCK+3QSofUv7Ob90xNVUOdJq5/OBrG4LAGFh2ZVrh5KnqDm1+d8/ls')
|
||||
WalletState(sequence=7, encrypted_wallet='czo4MTkyOjE2OjE6ZnqsMEkm/XFvZzRQ1RL6WKw4CeGJeLFp49gnE2SlZYaMjtnels+Cl6EDE1vlGF42bju0lUFLEEVwa1JSweu11Q0IIZDKbjqDLChMCLLItf7aEAxJsZ4/3nu/N2BVCK8Hxz81p0UJiExUImI84Q4FWRqMPgnal8A/OeW7Qor9IyuqkuP054Lt3GnYeJYd3mfN')
|
||||
'Success'
|
||||
```
|
||||
|
||||
|
@ -243,7 +245,7 @@ Generating keys...
|
|||
Done generating keys
|
||||
Successfully updated password and wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=8, encrypted_wallet='czo4MTkyOjE2OjE6Kd/DnozNDXYia8yYqrVI6OJ56tDAo5X4/Il+Ein/E6GRQ6K8/niK8Sjx1Cmpf7ecru14QS51pTwlFpS9mbwNE7CZ1wjAZHoLlL5B+dAECkSCFBHgBvq/29cXt6gG7KP+TLRLxZzGtgQRQiq6fsMBIIirw1ZCmpUNQP/PCHIJRfjJS0MNAGN8+srlPv+eUXIn')
|
||||
WalletState(sequence=8, encrypted_wallet='czo4MTkyOjE2OjE6bk6pmqtNnjBYgHvwo2wAVYDwD/Un0hTdbjSJBoRAJBAo6ZYk9VCjlEpPtrOo7UZiP7KgZpDI3KjI00dqG019krTalmuVKp3gXgojlK99Z9ZGSXQRK5baehq4cRmkuAaqDGysajRsVaWbPtNUgLM6Dw4FfoveUCmuATDrO+kv9Mg1tvs3a+IpMwdVXdxU2FEd')
|
||||
'Success'
|
||||
```
|
||||
|
||||
|
@ -251,7 +253,7 @@ We generate a new salt seed when we change the password
|
|||
|
||||
```
|
||||
>>> c1.salt_seed
|
||||
'155b6e8a9a8c9406844b6b0c4a40c3204ab1f06668470faa89e28aa89fefe3cf'
|
||||
'e081028900b64dffeb43b44c6e4beacce59510eeb78eea5df420297c77057586'
|
||||
```
|
||||
|
||||
This operation invalidates all of the user's auth tokens. This prevents other clients from accidentally pushing a wallet encrypted with the old password.
|
||||
|
@ -271,19 +273,19 @@ The client that changed its password can easily get a new token because it has t
|
|||
|
||||
```
|
||||
>>> c1.get_auth_token()
|
||||
Got auth token: 68a3db244e21709429e69e67352d02a3b26542c5ef2ac3377e19b17de71942d6
|
||||
Got auth token: ddbcbfb622b0b3653909f28f0b2df81e2f4c61b5f5316cf85a67e09e5e96b90c
|
||||
>>> c2.get_auth_token()
|
||||
Error 401
|
||||
b'{"error":"Unauthorized: No match for email and/or password"}\n'
|
||||
Failed to get the auth token. Do you need to update this client's password (set_local_password())?
|
||||
Failed to get the auth token. Do you need to verify your email address? Or update this client's password (set_local_password())?
|
||||
Or, in the off-chance the user changed their password back and forth, try updating secrets (update_derived_secrets()) to get the latest salt seed.
|
||||
>>> c2.set_local_password("eggsandwich")
|
||||
Generating keys...
|
||||
Done generating keys
|
||||
>>> c2.salt_seed
|
||||
'155b6e8a9a8c9406844b6b0c4a40c3204ab1f06668470faa89e28aa89fefe3cf'
|
||||
'e081028900b64dffeb43b44c6e4beacce59510eeb78eea5df420297c77057586'
|
||||
>>> c2.get_auth_token()
|
||||
Got auth token: 3917215675c5cc7fb5c5e24d583fddcd0a14c4370140e2274cf4c5da7eaae7bb
|
||||
Got auth token: f85b645bcd5d8bddd266cd95f6127dafec5f958ceab193bde0caf4767daafb71
|
||||
```
|
||||
|
||||
We don't allow password changes if we have pending wallet changes to push. This is to prevent a situation where the user has to merge local and remote changes in the middle of a password change.
|
||||
|
@ -296,16 +298,79 @@ Generating keys...
|
|||
Done generating keys
|
||||
Local changes found. Update remote wallet before changing password.
|
||||
'Failure'
|
||||
```
|
||||
|
||||
If we update the wallet first, we can do it.
|
||||
|
||||
```
|
||||
>>> c1.update_remote_wallet()
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=9, encrypted_wallet='czo4MTkyOjE2OjE6Hspn+wbfHEzSv+1zsM/sFUaJJZuLLP7jLtCl3Ou3OQhXGEpkC0pP7WcbdGdQ+4foakTaB/y/b9All85rJ1ZiGWFnaK8SS9Rd7JT1UCEHs0BhN5+SfIK58yukIefzP39ZlSGUomE3eifOqso8C/gY2FltO96TS8WXx6czxqm6M/dvLk6q10LpODCQEH5auTA6')
|
||||
WalletState(sequence=9, encrypted_wallet='czo4MTkyOjE2OjE6BggwBmly+7071Vb6oEQSjviE3a3o/o0W34ikaRwggJv37p1D3SAFEBBtPNpc0FSTIltg5f0sxd55tSwcYGUWt3uPsZSqPOuMiFYPtUtMpzg2dAi9g42wSM+m9EdlYGpKxypijqzGZD/Ck7t7PE5eDKLAUxF/voJoOg3uJMGhyzq0+j68KuSofgvCGBTwJOro')
|
||||
'Success'
|
||||
>>> c1.change_password("starboard")
|
||||
Generating keys...
|
||||
Done generating keys
|
||||
Successfully updated password and wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=10, encrypted_wallet='czo4MTkyOjE2OjE6Cnditb9t+rU56hfcMq6gW+lx1ek3TzyBZ4633FoiWCzTxIenbMyapolU0gnpWHasP8olOoL56LfSGVzP8eKG4JoRsU9VmOYXjkpY9QZCcKomVC4fJ17jPq/e2gJWDSv03pA1xbDhRpXRnZr3wd+37znTUyLpYzRDRAHpb2IGDi9FforobQRNcZUhx0DY8WIR')
|
||||
WalletState(sequence=10, encrypted_wallet='czo4MTkyOjE2OjE66qYzTZ+M37RQUfSRTKfVv9RvbqewMaQd6ZBsjZD+dvvjtXqcB6ZZ4q5K7Gd09VMkHRCP8tCYZ/48b/Yvt5sDiTcWSh/Ou08aaiUChPERKWNLpqOCTuF2B/DbsQPgCjwUiXNan+tW6rMbFM6QHxJ/FIzkJUsmab3nwYPon2xkRY8pnwj02R7J62Y7lMd7RiYV')
|
||||
'Success'
|
||||
>>> c1.get_auth_token()
|
||||
Got auth token: deb6a1a77ca601d01a75c6f150a767b6038e68cfb6cfd806333e5f5b6f03870f
|
||||
>>> c2.set_local_password("starboard")
|
||||
Generating keys...
|
||||
Done generating keys
|
||||
>>> c2.get_auth_token()
|
||||
Got auth token: d7fb866d4e77426a9c9c32f99a71bd73de53370ec4cc836e734461b4910ab66f
|
||||
```
|
||||
|
||||
# Websockets
|
||||
|
||||
A client can make a websocket connection to the server and receive notifications whenever another client updates the wallet on the server. The message will contain the sequence number so that the client can know whether they happen to be up to date. (The client that made the update will of course be up to date).
|
||||
|
||||
This test client will have a thread listening to the websocket which just prints info about new messages as they come in. A real client would likely choose to get the latest wallet from the server as soon as a messag ecomes through, assuming the sequence is newer than what the client has.
|
||||
|
||||
```
|
||||
>>> c1.start_websocket()
|
||||
>>> c2.start_websocket()
|
||||
```
|
||||
c1 connected for now
|
||||
c2 connected for now
|
||||
|
||||
Now make an update and see:
|
||||
|
||||
```
|
||||
>>> c1.update_remote_wallet()
|
||||
c1 got notified of a wallet update, sequence=11. If your client is behind this sequence, you should get the latest from the server.
|
||||
Successfully updated wallet state on server
|
||||
c2 got notified of a wallet update, sequence=11. If your client is behind this sequence, you should get the latest from the server.
|
||||
Synced walletState:
|
||||
WalletState(sequence=11, encrypted_wallet='czo4MTkyOjE2OjE6unyTAofXJh02g0GNXHYjNseaZVxmsRy8zVMBrgzLRKpw1gjmhEB+tsqFl+j1Boy0aqoKGoXFM/q0LZrrDx52elZZPTciTQ5w52IVbeWKe7eYPp2R6ZDsjxNTDfCuVz/CaE/k2uC1f/AW/d6BahTjUYH37EQ3RXEYpucpqnSFKye1PkhloVEu8w5AcnwWm8s/')
|
||||
'Success'
|
||||
```
|
||||
|
||||
Update again and we'll see the new sequence number:
|
||||
|
||||
```
|
||||
>>> c1.update_remote_wallet()
|
||||
c1 got notified of a wallet update, sequence=12. If your client is behind this sequence, you should get the latest from the server.
|
||||
c2 got notified of a wallet update, sequence=12. If your client is behind this sequence, you should get the latest from the server.
|
||||
Successfully updated wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=12, encrypted_wallet='czo4MTkyOjE2OjE66PTstwDb8W2Zjz/UkhzypUN8lKAZ3zSW43I3Got4+XjHQnSd+ssJ2s+6OTlAcQxoC3ngtvpV445Hcs/IEDoX00BeYtR0y5Wh0PD2507UkYhaI58R8Ie4Bw/9z/9+DBTsMmfZ1qogV9n8bbMZNjpRYGlQMFMjWbIr58uflMoD81sKI6NUiz6dBnic2QAj6Ueu')
|
||||
'Success'
|
||||
```
|
||||
|
||||
When we change a password, just as all auth tokens are invalidated, all sockets are also disconnected.
|
||||
|
||||
```
|
||||
>>> c1.change_password("ihatesockets")
|
||||
Generating keys...
|
||||
Done generating keys
|
||||
Successfully updated password and wallet state on server
|
||||
Synced walletState:
|
||||
WalletState(sequence=13, encrypted_wallet='czo4MTkyOjE2OjE62yuQuJXS1yKOu3Bk/IKDwyMw0l5lZyVuYmmMzlIV+fzdGUy3pcY0pJYaz9mH664tknjqTTCIuKO1lCd+OXCByHNl6+FlE2pTNTTm0yDJoBbGzw0mKlABi1ERZAuBy3Q0GqAneKtlkikcbjgQV2KB+pHdN9bD/b0Qey0/Zf6TMCXpVEGeAPrtJiBUpgN2Nmeb')
|
||||
'Success'
|
||||
```
|
||||
c1 disconnected for now: code = 1005 (no status code [internal]), no reason
|
||||
c2 disconnected for now: code = 1005 (no status code [internal]), no reason
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
# Generate the README since I want real behavior interspersed with comments
|
||||
# Come to think of it, this is accidentally a pretty okay integration test for client and server
|
||||
# NOTE - delete the database before running this, or else you'll get an error for registering. also we want the wallet to start empty
|
||||
# NOTE - in the SDK, create wallets called "test_wallet_1" and "test_wallet_2"
|
||||
|
||||
import time
|
||||
|
@ -44,8 +43,10 @@ For this example we will be working with a locally running server so that we don
|
|||
|
||||
code_block("""
|
||||
from test_client import Client
|
||||
c1 = Client("joe2@example.com", "123abc2", 'test_wallet_1', local=True)
|
||||
c2 = Client("joe2@example.com", "123abc2", 'test_wallet_2', local=True)
|
||||
import time
|
||||
email = "joe-%s@example.com" % int(time.time())
|
||||
c1 = Client("c1", email, "123abc2", 'test_wallet_1', local=True)
|
||||
c2 = Client("c2", email, "123abc2", 'test_wallet_2', local=True)
|
||||
""")
|
||||
|
||||
print("""
|
||||
|
@ -263,6 +264,61 @@ We don't allow password changes if we have pending wallet changes to push. This
|
|||
code_block("""
|
||||
c1.set_preference('animal', 'leemur')
|
||||
c1.change_password("starboard")
|
||||
""")
|
||||
|
||||
print("""
|
||||
If we update the wallet first, we can do it.
|
||||
""")
|
||||
|
||||
code_block("""
|
||||
c1.update_remote_wallet()
|
||||
c1.change_password("starboard")
|
||||
c1.get_auth_token()
|
||||
c2.set_local_password("starboard")
|
||||
c2.get_auth_token()
|
||||
""")
|
||||
|
||||
print("""
|
||||
# Websockets
|
||||
|
||||
A client can make a websocket connection to the server and receive notifications whenever another client updates the wallet on the server. The message will contain the sequence number so that the client can know whether they happen to be up to date. (The client that made the update will of course be up to date).
|
||||
|
||||
This test client will have a thread listening to the websocket which just prints info about new messages as they come in. A real client would likely choose to get the latest wallet from the server as soon as a messag ecomes through, assuming the sequence is newer than what the client has.
|
||||
""")
|
||||
|
||||
code_block("""
|
||||
c1.start_websocket()
|
||||
c2.start_websocket()
|
||||
""")
|
||||
|
||||
time.sleep(.1) # make sure the messages in the other thread have time to print
|
||||
|
||||
print("""
|
||||
Now make an update and see:
|
||||
""")
|
||||
|
||||
code_block("""
|
||||
c1.update_remote_wallet()
|
||||
""")
|
||||
|
||||
time.sleep(.1) # make sure the messages in the other thread have time to print
|
||||
|
||||
print("""
|
||||
Update again and we'll see the new sequence number:
|
||||
""")
|
||||
|
||||
code_block("""
|
||||
c1.update_remote_wallet()
|
||||
""")
|
||||
|
||||
time.sleep(.1) # make sure the messages in the other thread have time to print
|
||||
|
||||
print("""
|
||||
When we change a password, just as all auth tokens are invalidated, all sockets are also disconnected.
|
||||
""")
|
||||
|
||||
code_block("""
|
||||
c1.change_password("ihatesockets")
|
||||
""")
|
||||
|
||||
time.sleep(.1) # make sure the messages in the other thread have time to print
|
||||
|
|
|
@ -4,9 +4,13 @@ import base64, json, uuid, requests, hashlib, hmac
|
|||
from pprint import pprint
|
||||
from hashlib import scrypt, sha256 # TODO - audit! Should I use hazmat `Scrypt` instead for some reason?
|
||||
import secrets
|
||||
import threading
|
||||
|
||||
WalletState = namedtuple('WalletState', ['sequence', 'encrypted_wallet'])
|
||||
|
||||
import asyncio, time
|
||||
from websockets import connect as websockets_connect
|
||||
|
||||
class LBRYSDK():
|
||||
@staticmethod
|
||||
def get_wallet(wallet_id, password):
|
||||
|
@ -68,20 +72,25 @@ class WalletSync():
|
|||
self.API_VERSION = 3
|
||||
|
||||
if local:
|
||||
BASE_URL = 'http://localhost:8090'
|
||||
BASE_HTTP_URL = 'http://localhost:8090'
|
||||
BASE_WS_URL = 'ws://localhost:8090'
|
||||
else:
|
||||
BASE_URL = 'https://dev.lbry.id'
|
||||
BASE_HTTP_URL = 'https://dev.lbry.id'
|
||||
BASE_WS_URL = 'wss://dev.lbry.id'
|
||||
|
||||
# Avoid confusion. I sometimes forget, at any rate.
|
||||
print ("Connecting to Wallet API at " + BASE_URL)
|
||||
print ("Connecting to Wallet API at " + BASE_HTTP_URL)
|
||||
|
||||
API_URL = BASE_URL + '/api/%d' % self.API_VERSION
|
||||
API_HTTP_URL = BASE_HTTP_URL + '/api/%d' % self.API_VERSION
|
||||
API_WS_URL = BASE_WS_URL + '/api/%d' % self.API_VERSION
|
||||
|
||||
self.AUTH_URL = API_URL + '/auth/full'
|
||||
self.REGISTER_URL = API_URL + '/signup'
|
||||
self.PASSWORD_URL = API_URL + '/password'
|
||||
self.WALLET_URL = API_URL + '/wallet'
|
||||
self.CLIENT_SALT_SEED_URL = API_URL + '/client-salt-seed'
|
||||
self.AUTH_URL = API_HTTP_URL + '/auth/full'
|
||||
self.REGISTER_URL = API_HTTP_URL + '/signup'
|
||||
self.PASSWORD_URL = API_HTTP_URL + '/password'
|
||||
self.WALLET_URL = API_HTTP_URL + '/wallet'
|
||||
self.CLIENT_SALT_SEED_URL = API_HTTP_URL + '/client-salt-seed'
|
||||
|
||||
self.WEBSOCKET_URL = API_WS_URL + '/websocket'
|
||||
|
||||
# def resend_registration_email():
|
||||
# also rename this to __init__.py later
|
||||
|
@ -115,7 +124,7 @@ class WalletSync():
|
|||
|
||||
def get_salt_seed(self, email):
|
||||
params = {
|
||||
'email': base64.encodestring(bytes(email.encode('utf-8'))),
|
||||
'email': base64.encodebytes(bytes(email.encode('utf-8'))),
|
||||
}
|
||||
response = requests.get(self.CLIENT_SALT_SEED_URL, params=params)
|
||||
|
||||
|
@ -219,6 +228,46 @@ class WalletSync():
|
|||
print (response.content)
|
||||
raise Exception("Unexpected status code")
|
||||
|
||||
# NOTE this doesn't have a way to explicitly disconnect! Hopefully the real
|
||||
# thing is designed better.
|
||||
# NOTE - if you change your password, the server will kick off all existing
|
||||
# websocket connections for that user. each client will need to change their
|
||||
# password to connect again.
|
||||
def start_websocket(self, client_name, token):
|
||||
DEBUG = False
|
||||
# Poor man's debug log
|
||||
debugLog = lambda *x: print(*x) if DEBUG else None
|
||||
self.try_connect_websocket = True
|
||||
async def connection():
|
||||
while self.try_connect_websocket:
|
||||
debugLog (client_name, "trying to connect")
|
||||
try:
|
||||
async with websockets_connect(self.WEBSOCKET_URL + "?token=" + token) as websocket:
|
||||
print (client_name, "connected for now")
|
||||
while True:
|
||||
try:
|
||||
msg = await websocket.recv()
|
||||
# ex: 'wallet-update:5'
|
||||
if msg.startswith('wallet-update'):
|
||||
sequence = int(msg.split(':')[-1])
|
||||
print (client_name, "got notified of a wallet update, sequence=" + str(sequence) + ". If your client is behind this sequence, you should get the latest from the server.")
|
||||
else:
|
||||
debugLog (client_name, "got an unknown message:", msg)
|
||||
except Exception as e:
|
||||
print (client_name, "disconnected for now:", e)
|
||||
time.sleep(1)
|
||||
break
|
||||
except Exception as e:
|
||||
debugLog (client_name, "failed to connect:", e)
|
||||
time.sleep(1)
|
||||
|
||||
asyncio.run(connection())
|
||||
|
||||
# NOTE - this only stops retrying connections and sending messages. If a
|
||||
# socket is happily connected this won't stop it.
|
||||
def stop_try_reconnect_websocket(self):
|
||||
self.try_connect_websocket = False
|
||||
|
||||
# Thanks to Standard Notes. See:
|
||||
# https://docs.standardnotes.com/specification/encryption/
|
||||
|
||||
|
@ -299,12 +348,13 @@ class Client():
|
|||
|
||||
return True
|
||||
|
||||
def __init__(self, email, root_password, wallet_id='default_wallet', local=False):
|
||||
def __init__(self, client_name, email, root_password, wallet_id='default_wallet', local=False):
|
||||
self.wallet_sync_api = WalletSync(local=local)
|
||||
self.client_name = client_name # Just for async output so we know who's talking
|
||||
|
||||
# Represents normal client behavior (though a real client will of course save device id)
|
||||
self.device_id = str(uuid.uuid4())
|
||||
self.auth_token = 'bad token'
|
||||
self.auth_token = 'bad-token'
|
||||
self.synced_wallet_state = None
|
||||
|
||||
self.email = email
|
||||
|
@ -312,6 +362,8 @@ class Client():
|
|||
|
||||
self.wallet_id = wallet_id
|
||||
|
||||
self.ws_thread = None
|
||||
|
||||
def register(self):
|
||||
# Note that for each registration, i.e. for each domain, we generate a
|
||||
# different salt seed.
|
||||
|
@ -394,6 +446,29 @@ class Client():
|
|||
# TODO - actually set the right hash
|
||||
self.mark_local_changes_synced_to_empty()
|
||||
|
||||
def start_websocket(self):
|
||||
# NOTE - Not putting any effort into here responsible thread programming
|
||||
# here. Not accounting for errors, logging out and logging into other
|
||||
# servers, etc. Only going so far as to make sure we don't start two at
|
||||
# once.
|
||||
|
||||
if self.ws_thread is None:
|
||||
self.ws_thread = threading.Thread(
|
||||
target=self.wallet_sync_api.start_websocket,
|
||||
args=(self.client_name, self.auth_token),
|
||||
daemon=True,
|
||||
)
|
||||
self.ws_thread.start()
|
||||
else:
|
||||
print("Websocket already connected (or trying to).")
|
||||
|
||||
def stop_try_reconnect_websocket(self):
|
||||
self.wallet_sync_api.stop_try_reconnect_websocket()
|
||||
|
||||
# Not trying to be a responsible thread programmer here, this is just a
|
||||
# demo, and not a threading demo
|
||||
self.ws_thread = None
|
||||
|
||||
def get_auth_token(self):
|
||||
token = self.wallet_sync_api.get_auth_token(
|
||||
self.email,
|
||||
|
|
Loading…
Reference in a new issue