Blender-Pipeline/network/multiuser_server.py
Jeison Yehuda Amihud (Blender Dumbass) dabea36fc4 Multiuser StartUp Polish
There was a sync problem at the startup of the server
It seems to be gone now that I fixed it. Please test.
2021-01-03 08:01:44 +00:00

526 lines
19 KiB
Python

# THIS FILE IS A PART OF VCStudio
# PYTHON 3
###############################################################################
# This is the SERVER program for the Multiuser system in the VCStudio.
# During production of "I'm Not Even Human" back in 2016 - 2018 we had multiple
# machines to work on the project. And back then some way of moving parts of the
# project between the machines was needed. Originally we were using a simple
# USB thumb drive. But quickly it became inpractical.
# Then I developed a little program called J.Y.Exchange. ( Python 2 ) Which
# was very cool, general purpose moving thing. And after this I introduced
# compatibility layer to J.Y.Exchange using Organizer.
# Later with the development of Blender-Organizer ( It's when I stopped using
# Gtk and wrote the first Organizer with custom UI ) I wanted to make a better
# sync function. And the history recording was a step into that direction.
# Tho unfortunatly. The sync function was never finished.
# This is an attempt so make a sync function with some extended functionality
# so in theory when a large studio wants to use VCStudio they would have a
# way to work with multiple users in the same time.
# CHALLENGES
# The main challenge of this system would be the sizes of the projects. See every
# asset, scene, shot, has a folder. In which you have hundreds of files. Take
# shots for example. Each has 4 render directories. Where the renderer puts
# single frames. 24 frames per second on a large project and we have 2 hundred
# thousand frames for a 2 hour movie. 8 hundred thousand frames if all 4 folders
# are filled up. And it's not counting all the blend files with all the revisions.
# Assets with textures and references. And other various stuff.
# This is not going to be wise to just send it over the network on every frame.
# We need a system of version contoll. Something that is a little
# more dynamic. And doesn't require scanning the entire project.
# HISTORY
# The idea behind history is to record what stuff are being changed. So we are
# sending only the moderatly small analytics file over the network. And if a user
# sees that other user has changed something. They can click a button to update
# this thing on their machine too.
# CONCEPT
# I think every item in the VCStudio. Meaning asset or shot. Every thing that we
# can access using the win.cur variable. Should be concidered as their own things
# but with some smartness added to it. For example. Since there are linked assets
# in blend files for the shots. When updating the shot. You should also update
# the assets.
# This server program is going to be the main allocator of recourses. The all
# knowing wizzard to which all the VCStudios will talk in order to get up to
# date information of who does what.
###############################################################################
import os
import sys
import time
import insure
import socket
import random
import datetime
import threading
import subprocess
# So the first thing we want to do is to make sure that we are talking to the
# right project. For this we are going to use the name of the project. As
# specified in the analytics data. Not by the folder name. Since multiple
# computers might have different folder names.
project_name = "VCStudio_Multiuser_Project_Name_Failed"
try:
project_name = sys.argv[1]
except:
pass
# Since it's a terminal application. That I'm totally suggest you run from the
# stand alone computer. Rather then from the UI of the Multiuser. We are going
# to treat it as a terminal program and print a bunch stuff to the terminal.
print("\n") # For when running using python3 run.py -ms
# Not at this moment I want to know what is my IP address. In the local network
# space there is.
ipget = subprocess.Popen(["hostname", "-I"],stdout=subprocess.PIPE, universal_newlines=True)
ipget.wait()
thisIP = ipget.stdout.read()[:-2]
# Please tell me if you see an easier methon of gathering the current IP.
# Before we go any fursther I think it's a good idea to actually initilize the
# server.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", 64646))
sock.listen(0)
# Now since we are using threading. I need a way to manage them somehow. I love
# when 1 function can talk freely to another. So let's use a global dictionary
# of threads.
threads = {} # All of the connections will be each in it's own thread.
users = {} # This is the list of users and metadata about those users.
messages = [["Multiuser Server", "Server Started"]] # This is a list of messages sent by users.
assets = {} # This is a list of assets there is in the project.
story = ["0.0.0.0:0000", {}, "0000/00/00-00:00:00"] # The current story
# story | last modification time
analytics = [{}, "0000/00/00-00:00:00"] # The current analytics
# analytics | last modification time
def output(string):
# This is a fancy Print() function.
cs0 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs0.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs0.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
cs0.sendto(bytes("VCStudio MULTIUSER SERVER TERMINAL | "+string, 'utf-8'), ("255.255.255.255", 54545))
cs0.close()
seconds_format = "%H:%M:%S"
time = datetime.datetime.strftime(datetime.datetime.now(), seconds_format)
print(time+" | "+string)
output("multiuser server | started | "+thisIP+" | "+project_name)
# Okay let's define our MAIN. Because It seems like important to do here.
def main():
# This function is going to run in a loop. Unless user specifies to close it
# from the UI or by closing the terminal.
while True:
# Let's listen for connections.
client, ipport = sock.accept()
ip, port = ipport
userid = str(ip)+":"+str(port)
# Now I'm going to add the user into the main users dictionary
users[userid] = {
"client" :client,
"ip" :ip ,
"port" :port ,
"username":"" ,
"request" :[] , # The current request to this or all users
"asnswer" :[] ,
"camera" :[0,0] , # The camera in the story world
"pause" :False # I need this when a different thread is talking
# to the user
}
# And now let's call a thread for this user
threads[userid] = threading.Thread(target=user, args=(userid, ))
threads[userid].setDaemon(True) # So I could close the program
threads[userid].start()
def user(userid):
# Let's get the info about this user
client = users[userid]["client"]
ip = users[userid]["ip"]
port = users[userid]["port"]
username = users[userid]["username"]
# This function will run for every single connected user in it's own thread.
# Then we want to know client's Username.
insure.send(client, "username?")
data = insure.recv(client)
users[userid]["username"] = data
username = users[userid]["username"]
# Now let's check that this person is from our project.
insure.send(client, "project?")
data = insure.recv(client)
# If this is the wrong project. The client will be kicked out.
if data != project_name:
insure.send(client, "wrong_project")
client.close()
return
output(username+" | connected | "+userid)
# So now that the user is here. We can do some stuff with the user.
# And since the user is the user. And server is a server. Like in
# litteral sense of those 2 words. From now own user will be able
# to request a bunch of stuff. So...
request_all([0,"users"])
def get_story(client):
global story
insure.send(client, "story")
clients_story = insure.recv(client)
if clients_story[1] > story[2]:
story = [userid, clients_story[0], clients_story[1]]
users[userid]["camera"] = clients_story[0]["camera"].copy()
insure.send(client, [story[0], story[1]])
insure.recv(client)
def get_assets(client):
insure.send(client, "assets")
clients_assets = insure.recv(client)
# "/chr/Moria" : [userid, timestamp]
for asset in clients_assets:
if asset not in assets or assets[asset][1] < clients_assets[asset][1]:
assets[asset] = clients_assets[asset]
insure.send(client, assets)
insure.recv(client)
def get_analytics(client):
global analytics
insure.send(client, "analytics")
clients_analytics = insure.recv(client)
if clients_analytics[1] > analytics[1]:
analytics = clients_analytics
insure.send(client, analytics[0])
insure.recv(client)
get_story(client)
get_assets(client)
get_analytics(client)
insure.send(client, "yours")
while True:
try:
# Recieving users request
request = insure.recv(client)
if request == "yours":
###############################################################
# REQUESTS FROM ONE USER TO ANOTHER #
###############################################################
if len(users[userid]["request"]) > 1:
if users[userid]["request"][1] == "story":
get_story(client)
insure.send(client, "yours")
elif users[userid]["request"][1] == "analytics":
get_analytics(client)
insure.send(client, "yours")
elif users[userid]["request"][1] == "assets":
get_assets(client)
insure.send(client, "yours")
elif users[userid]["request"][1] == "users":
insure.send(client, "users")
elif users[userid]["request"][1] == "at":
insure.send(client, users[userid]["request"])
elif users[userid]["request"][1] == "message":
insure.send(client, ["messages",messages])
elif users[userid]["request"][0] == "serve":
serve(
users[userid]["request"][1],
userid,
users[userid]["request"][2]
)
# Clearing the request.
users[userid]["request"] = []
###########################################################
# AUTOMATIC REQUESTING #
###########################################################
if not assets:
request_all([0,"assets"])
print("assets requested")
# If there is nothing we want to tell the user that it's
# their turn to request.
else:
insure.send(client, "yours")
###############################################################
# REQUESTS FROM USER #
###############################################################
else:
if request == "story":
get_story(client)
request_all([userid, "story"])
elif request == "analytics":
get_analytics(client)
request_all([userid, "analytics"])
elif request[0] == "at":
output(userid+" | at | "+request[1]+" | time | "+request[2])
request_all([userid, "at", request[1], request[2]])
elif request == "users":
insure.send(client, users_list())
elif request[0] == "message":
messages.append([username, request[1]])
request_all([0, "message"])
elif request[0] == "get":
# If a user sends GET. It means that the user wants a
# folder from another user. This means that we need to
# communicate with 2 clients at ones. Which is kind a
# tricky.
try:
# There is a problem to just simply requesting a user
# directly. Because he might have something requested.
while users[request[1]]["request"]:
time.sleep(0.001)
users[request[1]]["request"] = ["serve", request[2], userid]
# The problem is that this thead will probably continue
# asking stuff from the server. Which is not good. We want
# to stop it here and now.
# Pause
users[userid]["pause"] = True
while users[userid]["pause"]:
time.sleep(0.001) # Funny but thread needs to do something or
# it pauses all of the threads. LOL.
# The other thread will unpause this thread when the
# folder is downloaded.
except Exception as e:
# Sometimes there is no user to get it from.
output(userid+" | get | error | "+str(e))
globals()["assets"] = {}
# Finishing the request and giving the user it's turn
insure.send(client, "yours")
except Exception as e:
# If the connection is lost. We want to delete the user.
request_all([0,"users"])
output(username+" | "+userid+" | "+str(e)+" | line: "+str(sys.exc_info()[-1].tb_lineno))
try:
del users[userid]
except Exception as e:
output("deleting user error | "+str(e))
# We want to clear the data of the assets if this happens
globals()["assets"] = {}
return
def users_list():
# This function will make users
U = {}
for user in list(users.keys()):
U[user] = {
"username":users[user]["username"],
"camera" :users[user]["camera"]
}
return U
def request_all(request):
for user in users:
if user != request[0]:
#while users[user]["request"]:
# time.sleep(0.001)
users[user]["request"] = request
def broadcast():
# This function will broadcast the IP address of the server to all VCStudio
# users. So the user experience would be straight forward. As clicking a button
# and not complex as knowing the IP and stuff. I mean yes. Good for you if you
# want to do everything manually. But most people are dumb.
cs1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs1.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
message = "VCStudio MULTIUSER SERVER | "+thisIP+" | "+project_name
cs1.sendto(bytes(message, 'utf-8'), ("255.255.255.255", 54545))
cs1.close()
def listen():
# This function will listen to all commenications for any kind of abbort or
# extreme messages.
message = ""
# Let's try receiving messages from the outside.
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("255.255.255.255", 54545))
sock.settimeout(0.05)
data, addr = sock.recvfrom(1024)
data = data.decode('utf8')
sock.close()
message = data
except:
pass
# Now let's close the serer if the message is VCStudio ABORT MULTIUSER
if message == "VCStudio ABORT MULTIUSER":
output("recieved abort message | closing")
exit()
def serve(folder, fromid, toid):
output("serving | "+folder+" | from | "+fromid+" | to | "+toid)
# Let's first of all get all the data that we need before starting the
# operation.
to = users[ toid ]["client"]
fr = users[fromid]["client"]
# Now let's tell our "fr" that we need the "folder" from him.
insure.send(fr, ["give", folder])
# The "fr" client should respond with a list of files / folder and their
# hashes. The is no need for us ho have this data. So let's pipe it
# directly to the "to" user.
insure.send(to, insure.recv(fr))
# Now we are going to retvieve the list of files that the user needs.
# We are going to save this list since we will be doing the connection of
# them transferring the files. And we need to know the length of it.
getlist = insure.recv(to)
insure.send(fr, getlist)
# Now let's serve the files.
for f in getlist:
print("serving file:",f)
insure.send(to, insure.recv(fr))
insure.send(fr, insure.recv(to))
# And finally let's release our "to".
users[toid]["pause"] = False
# Before we start the main loop I want to have the server loop too.
threads["server_listen"] = threading.Thread(target=main, args=())
threads["server_listen"].setDaemon(True) # So I could close the program
threads["server_listen"].start()
while True:
# This next stuff will be running in the main thread in the loop.
# First we are going to broadcast ourselves.
broadcast()
# Then we are going to listen for input from the Users. Because some kind of
# abbort operation could be called using UDP protocol. And we want to be
# able to hear it.
listen()