287 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			287 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| import sacn
 | |
| import time
 | |
| import sys
 | |
| import yaml
 | |
| import math
 | |
| import random
 | |
| from util import fprint
 | |
| import platform    # For getting the operating system name
 | |
| import subprocess  # For executing a shell command
 | |
| from util import win32
 | |
| import cv2
 | |
| import numpy as np
 | |
| from uptime import uptime
 | |
| 
 | |
| sender = None
 | |
| debug = True
 | |
| config = None
 | |
| leds = None
 | |
| controllers = None
 | |
| data = None
 | |
| start = uptime()
 | |
| 
 | |
| def ping(host):
 | |
|     #Returns True if host (str) responds to a ping request.
 | |
| 
 | |
|     # Option for the number of packets as a function of
 | |
|     if win32:
 | |
|         param1 = '-n'
 | |
|         param2 = '-w'
 | |
|         param3 = '250'
 | |
|     else:
 | |
|         param1 = '-c'
 | |
|         param2 = '-W'
 | |
|         param3 = '0.25'
 | |
| 
 | |
|     # Building the command. Ex: "ping -c 1 google.com"
 | |
|     command = ['ping', param1, '1', param2, param3, host]
 | |
| 
 | |
|     return subprocess.call(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) == 0
 | |
| 
 | |
| def map():
 | |
|     global config
 | |
|     global leds
 | |
|     global controllers
 | |
| 
 | |
|     with open('config.yml', 'r') as fileread:
 | |
|         #global config
 | |
|         config = yaml.safe_load(fileread)
 | |
| 
 | |
|     leds = list()
 | |
|     controllers = list()
 | |
|     #fprint(config["led"]["map"])
 | |
|     for shape in config["led"]["map"]:
 | |
|         if shape["type"] == "circle":
 | |
|             #fprint(shape["pos"])
 | |
|             anglediv = 360.0 / shape["size"]
 | |
|             angle = 0
 | |
|             radius = shape["diameter"] / 2
 | |
|             lednum = shape["start"]
 | |
|             if len(leds) < lednum + shape["size"]:
 | |
|                 for x in range(lednum + shape["size"] - len(leds)):
 | |
|                     leds.append(None)
 | |
|             while angle < 359.999:
 | |
|                 tmpangle = angle + shape["angle"]
 | |
|                 x = math.cos(tmpangle * (math.pi / 180)) * radius + shape["pos"][0]
 | |
|                 y = math.sin(tmpangle * (math.pi / 180)) * radius + shape["pos"][1]
 | |
|                 leds[lednum] = (x,y)
 | |
|                 lednum = lednum + 1
 | |
|                 angle = angle + anglediv
 | |
| 
 | |
|     flag = 0
 | |
|     for x in leds:
 | |
|         if x is None:
 | |
|             flag = flag + 1
 | |
|     if flag > 0:
 | |
|         fprint("Warning: Imperfect LED map ordering. Hiding undefined lights.")
 | |
|         for x in range(len(leds)):
 | |
|             if leds[x] is None:
 | |
|                 leds[x] = (0, 0)
 | |
| 
 | |
| 
 | |
|     #leds = tmpleds.reverse()
 | |
|     #fprint(leds)
 | |
| 
 | |
|     # controller mapping
 | |
|     for ctrl in config["led"]["controllers"]:
 | |
|         if len(controllers) < ctrl["id"]+1:
 | |
|             for x in range(ctrl["id"]+1 - len(controllers)):
 | |
|                 controllers.append(None)
 | |
| 
 | |
|         controllers[ctrl["id"]] = (ctrl["ledstart"],ctrl["ledend"]+1,ctrl["ip"])
 | |
|     
 | |
|     #fprint(controllers)
 | |
| 
 | |
|     if(debug):
 | |
|         import matplotlib.pyplot as plt
 | |
|         plt.axis('equal')
 | |
|         for ctrl in controllers:
 | |
|             plt.scatter(*zip(*leds[ctrl[0]:ctrl[1]]), s=2)
 | |
|         #plt.scatter(*zip(*leds), s=3)
 | |
|         plt.savefig("map.png", dpi=600, bbox_inches="tight")
 | |
| 
 | |
|     #return leds, controllers
 | |
| 
 | |
| def init():
 | |
|     map()
 | |
|     global sender
 | |
|     global config
 | |
|     global leds
 | |
|     global controllers
 | |
|     global data
 | |
|     sender = sacn.sACNsender(fps=config["led"]["fps"], universeDiscovery=False)
 | |
|     sender.start()  # start the sending thread
 | |
|     for x in range(len(controllers)):
 | |
|         print("Waiting for the controller at", controllers[x][2], "to be online...", end="")
 | |
|         count = 0
 | |
|         while not ping(controllers[x][2]):
 | |
|             count = count + 1
 | |
|             if count >= config["led"]["timeout"]:
 | |
|                 fprint(" ERROR: controller still offline after " + str(count) + " seconds, continuing...")
 | |
|                 break
 | |
|         if count < config["led"]["timeout"]:
 | |
|             fprint(" done")
 | |
|     for x in range(len(controllers)):
 | |
|         print("Activating controller", x, "at", controllers[x][2], "with", controllers[x][1]-controllers[x][0], "LEDs.")
 | |
|         sender.activate_output(x+1)  # start sending out data
 | |
|         sender[x+1].destination = controllers[x][2]
 | |
|     sender.manual_flush = True
 | |
| 
 | |
|     # initialize global pixel data list
 | |
|     data = list()
 | |
|     for x in range(len(leds)):
 | |
|         data.append((0,0,0))
 | |
|     sendall(data)
 | |
|         
 | |
|     fprint("Running start-up test sequence...")
 | |
|     for x in range(len(leds)):
 | |
|         setpixel(random.random()*30,random.random()*30,30+random.random()*225,x)
 | |
|     sendall(data)
 | |
|     time.sleep(2)
 | |
|     alloffsmooth()
 | |
| 
 | |
| def sendall(datain):
 | |
|     # send all LED data to all controllers
 | |
|     # data must have all LED data in it as [(R,G,B,)] tuples in an array, 1 tuple per pixel
 | |
|     global controllers
 | |
|     global sender
 | |
|     sender.manual_flush = True
 | |
|     for x in range(len(controllers)):
 | |
|         sender[x+1].dmx_data = list(sum(datain[controllers[x][0]:controllers[x][1]] , ())) # flatten the subsection of the data array
 | |
|     
 | |
|     sender.flush()
 | |
|     time.sleep(0.002)
 | |
|     #sender.flush() # 100% reliable with 2 flushes, often fails with 1
 | |
|     time.sleep(0.002)
 | |
|     #sender.flush()
 | |
| 
 | |
| def fastsendall(datain):
 | |
|     # send all LED data to all controllers
 | |
|     # data must have all LED data in it as [(R,G,B,)] tuples in an array, 1 tuple per pixel
 | |
|     global controllers
 | |
|     global sender
 | |
|     sender.manual_flush = False
 | |
|     for x in range(len(controllers)):
 | |
|         sender[x+1].dmx_data = list(sum(datain[controllers[x][0]:controllers[x][1]] , ())) # flatten the subsection of the data array
 | |
|     
 | |
|     sender.flush()
 | |
| 
 | |
| def senduniverse(datain, lednum):
 | |
|     # send all LED data for 1 controller/universe
 | |
|     # data must have all LED data in it as [(R,G,B,)] tuples in an array, 1 tuple per pixel
 | |
|     global controllers
 | |
|     global sender
 | |
|     for x in range(len(controllers)):
 | |
|         if lednum >= controllers[x][0] and lednum < controllers[x][1]:
 | |
|             sender[x+1].dmx_data = list(sum(datain[controllers[x][0]:controllers[x][1]] , ())) # flatten the subsection of the data array
 | |
|     
 | |
|     sender.flush()
 | |
|     time.sleep(0.004)
 | |
|     #sender.flush() # 100% reliable with 2 flushes, often fails with 1
 | |
|     #time.sleep(0.002)
 | |
|     #sender.flush()
 | |
| 
 | |
| def alloff():
 | |
|     tmpdata = list()
 | |
|     for x in range(len(leds)):
 | |
|         tmpdata.append((0,0,0))
 | |
|     sendall(tmpdata)
 | |
|     #sendall(tmpdata)
 | |
|     #sendall(tmpdata) #definitely make sure it's off
 | |
| 
 | |
| def allon():
 | |
|     global data
 | |
|     sendall(data)
 | |
| 
 | |
| def alloffsmooth():
 | |
|     tmpdata = data
 | |
|     for x in range(256):
 | |
|         for x in range(len(data)):
 | |
|             setpixel(tmpdata[x][0]-1,tmpdata[x][1]-1,tmpdata[x][2]-1, x)
 | |
|         sendall(tmpdata)
 | |
| 
 | |
|     alloff()
 | |
| 
 | |
| def setpixelnow(r, g, b, num):
 | |
|     # slight optimization: send only changed universe
 | |
|     # unfortunately no way to manual flush data packets to only 1 controller with this sACN library
 | |
|     global data
 | |
|     setpixel(r,g,b,num)
 | |
|     senduniverse(data, num)
 | |
| 
 | |
| def setpixel(r, g, b, num):
 | |
|     global data
 | |
|     # constrain values
 | |
|     if r < 0:
 | |
|         r = 0
 | |
|     elif r > 255:
 | |
|         r = 255
 | |
|     if g < 0:
 | |
|         g = 0
 | |
|     elif g > 255:
 | |
|         g = 255
 | |
|     if b < 0:
 | |
|         b = 0
 | |
|     elif b > 255:
 | |
|         b = 255
 | |
|     data[num] = (int(r), int(g), int(b))
 | |
| 
 | |
| def close():
 | |
|     global sender
 | |
|     time.sleep(0.5)
 | |
|     sender.stop()
 | |
| 
 | |
| def mapimage(image, fps=60):
 | |
|     global start
 | |
|     while uptime() - start < 1/fps:
 | |
|         time.sleep(0.0005)
 | |
|     start = uptime()
 | |
|     leds_adj = [(x+300, 
 | |
|                     y+340 )
 | |
|                    for x, y in leds]
 | |
|     leds_normalized = [(x / max([led[0] for led in leds_adj]) * min(image.shape[0:2]), 
 | |
|                     y / max([led[1] for led in leds_adj]) * min(image.shape[0:2]) )
 | |
|                    for x, y in leds_adj]
 | |
|     
 | |
|     cv2.imshow("video", image)
 | |
|     cv2.waitKey(1)
 | |
| 
 | |
|     fprint("Show frame")
 | |
|     im_rgb = image #cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # OpenCV uses BGR format by default
 | |
|     avgx = 0
 | |
|     avgy = 0
 | |
|     for xx in range(len(leds_normalized)):
 | |
|         led = leds_normalized[xx]
 | |
|         x, y = int(round(led[0])), int(round(led[1]))
 | |
|         
 | |
|         if x < im_rgb.shape[1] and y < im_rgb.shape[0]:
 | |
|             #avgx += x
 | |
|             #avgy += y
 | |
|             color = tuple(im_rgb[y, x])
 | |
|             setpixel(color[0]/4,color[1]/4,color[2]/4,xx)
 | |
|             #print(color)
 | |
|         else:
 | |
|             #avgx += x
 | |
|             #avgy += y
 | |
|             setpixel(0,0,0,xx)
 | |
|     #avgx /= len(leds)
 | |
|     #avgy /= len(leds)
 | |
|     #print((avgx,avgy, max([led[0] for led in leds_adj]), max([led[1] for led in leds_adj]) , min(image.shape[0:2]) ))
 | |
|     global data
 | |
|     fastsendall(data)
 | |
| 
 | |
|     
 | |
| if __name__ == "__main__":
 | |
|     init()
 | |
|     cap = cv2.VideoCapture('badapple.mp4')
 | |
|     while cap.isOpened():
 | |
|         ret, frame = cap.read()
 | |
|         if not ret:
 | |
|             break
 | |
|         mapimage(frame)
 | |
| 
 | |
|     time.sleep(5)
 | |
|     close()
 | |
|     #sys.exit(0) |