Youtube PTZ Control by Chatbot

Intro

In order to practice my Python skills I tend to focus on simple projects. So when I stumbled across a couple of Python libraries that let me control a Dahua PTZ camera and respond to Youtube livechat messages, i put two and two together. Why not let Youtube viewers of my Drierivierenpunt live stream interact with the camera by entering a few pre set commands into the live chat console. The camera will move to the desired position or preset and let the user have control for 5 minutes. After which a ‘rest’ period of 15 minutes follows before the whole cycle starts all over again.

Initial Setup

Google Console

Head over to Google’s Developer Console and create a new project. For that project enable YouTube Data API v3 and create a Desktop application with UI. In the Oauth Consent Screen give your application a name and set the scope to /auth/youtube – Manage your YouTube account

At the Credentials page click on Create Credentials and then Oauth client ID. When finished download the JSON file.

Python Libraries

Now we’ve got our YouTube permissions in place we can start building the application. Mind you, YouTube ‘only’ allows 10.000 units per day. List (messages) has a cost of 1, write, delete have a cost of 50. I’ll refer to this later in the tutorial.

I personally use PyCharm for building Python application. But you can edit the application with any text editor if you prefer.

Download the required libraries.

Virtual Environment

First create a virtual environment for the chatbot to run in

sudo apt install -y python3-venv
mkdir myProjectname && cd myProjectname
python3 -m venv env
source env/bin/activate

python-youtubechat

Then download python-youtubechat. This library enables the application to login to your YouTube channel and act as a chatbot for listing, sending or deleting messages.

git clone https://github.com/shughes-uk/python-youtubechat.git
python3 setup.py --install

python-amcrest

Next download python-amcrest. This library enables the application to control your Amcrest or Dahua (PTZ) camera with function like pan, tilt, zoom and go to preset.

pip3 install amcrest

imgur-python

Finally download imgur-python. We’ll use this library to upload images taken from the camera directly and upload them to an album on Imgur.

pip3 install imgur-python

OAuth

In the project folder rename the previously downloaded JSON file to client_secrets.json and execute get_auth_key.py

python3 get_auth_key.py

This will launch a webbrowser and prompt you to login to your YouTube channel. Accept the permissions, copy the auth key and paste it into the Python application. If successful this will result in the creation of the oauth_creds file which wil grant the application permission to manage your YouTube account including reading and posting live chat messages. (which is what we’re after)

To check if everything is setup right, we create a simple Python application

from youtubechat import YoutubeLiveChat, get_live_chat_id_for_broadcast_id
from amcrest import AmcrestCamera

broadcast_id = "your-youtube-livestream-id"
print(broadcast_id)

livechat_id = get_live_chat_id_for_broadcast_id(broadcast_id,"oauth_creds")
camera = AmcrestCamera('hostname', port, 'username', 'password').camera

print(livechat_id)
print(camera.software_information)

You should see your Youtube Livechat ID and Camera Software Information being displayed. That means we’re ready to head on to the next step. Binding it all together.

Python Script

Imports

We’ll start of with importing a few libraries

from time import sleep
from datetime import datetime
import threading
from youtubechat import YoutubeLiveChat, get_live_chat_id_for_broadcast_id
from amcrest import AmcrestCamera
import json
from imgur_python import Imgur
from os import path

Variables

Then we setup a couple of variables to hold a list of commands we want to look out for in the live chat console, active users and set some booleans.

First the list of valid command. These are the commands the user inputs into YouTube live chat console and which will trigger the Python script and eventually control the camera to the desired position. Opted for some basic PTZ commands like up and down, but also a few presets of the camera. (like noord and veer)

valid_commands = ["!right", "!rechts", "!left", "!links", "!up", "!omhoog", "!down", "!naarbeneden", "!zoomin",
                  "!zoomout", "!drierivierenpunt", "!noord", "!papendrecht", "!ertepeller", "!zwijndrecht", "!veer"]

Next i will set a few booleans to verify if a user is active and whether it’s their first (valid) message

user_active = False
first_message = True

A string to hold the active user name

active_user_name = ''

A boolean to check if the messages thread is running

running = False

And finally a couple of timers so that the user has a certain amount of time controlling the camera and a sleep period between user control.

timer_interval = 900
timer_user = 300

PTZ Control

This is the part that is actually controlling the camera. It logs into the camera using the Amcrest library and moves the camera to the position the user requests through the respond function.

def ptz_control(msg):
    if msg.message_text.strip().lower() in ['!right', '!rechts']:
        camera.ptz_control_command(action="start", code="Right", arg1=0, arg2=4, arg3=0)
        sleep(1)
        camera.ptz_control_command(action="stop", code="Right", arg1=0, arg2=4, arg3=0)
        print("Right")
    elif msg.message_text.strip().lower() in ['!left', '!links']:
        camera.ptz_control_command(action="start", code="Left", arg1=0, arg2=4, arg3=0)
        sleep(1)
        camera.ptz_control_command(action="stop", code="Left", arg1=0, arg2=4, arg3=0)
        print("Left")
    elif msg.message_text.strip().lower() in ['!up', '!omhoog']:
        camera.ptz_control_command(action="start", code="Up", arg1=0, arg2=4, arg3=0)
        sleep(1)
        camera.ptz_control_command(action="stop", code="Up", arg1=0, arg2=4, arg3=0)
        print("Up")
    elif msg.message_text.strip().lower() in ['!down', '!naarbeneden']:
        camera.ptz_control_command(action="start", code="Down", arg1=0, arg2=4, arg3=0)
        sleep(1)
        camera.ptz_control_command(action="stop", code="Down", arg1=0, arg2=4, arg3=0)
        print("Down")
    elif msg.message_text.strip().lower() == '!zoomin':
        camera.zoom_in(True)
        sleep(1)
        camera.zoom_in(False)
    elif msg.message_text.strip().lower() == '!zoomout':
        camera.zoom_out(True)
        sleep(1)
        camera.zoom_out(False)
    elif msg.message_text.strip().lower() == '!drierivierenpunt':
        camera.go_to_preset(preset_point_number=1, channel=1)
        print("Drierivierenpunt")
    elif msg.message_text.strip().lower() == '!noord':
        camera.go_to_preset(preset_point_number=2, channel=1)
        print("Noord")
    elif msg.message_text.strip().lower() == '!papendrecht':
        camera.go_to_preset(preset_point_number=3., channel=1)
        print("Papendrecht")
    elif msg.message_text.strip().lower() == '!ertepeller':
        camera.go_to_preset(preset_point_number=4, channel=1)
        print("Ertepeller")
    elif msg.message_text.strip().lower() == '!zwijndrecht':
        camera.go_to_preset(preset_point_number=5, channel=1)
        print("Zwijndrecht")
    elif msg.message_text.strip().lower() == '!veer':
        camera.go_to_preset(preset_point_number=6, channel=1)
        print("Veer")
    elif msg.message_text.strip().lower() == '!tour':
        camera.tour(action=True, channel=1, tour_path_number=1)
        print("Tour")
    else:
        print('Else')
        True

A couple of things to note about this piece of code. The amount of degrees the camera moves between to points is determined by the start and stop action. So i first issue the start command, then sleep for one second followed by a stop command. If you increase this number the camera will move more to right/left/up/down. Same goes for zoom in and zoom out, albeit that those parameters take a boolean. (True/False) Presets are set in the camera itself and can be accessed by referring to the preset_point_number

Respond

The respond function, as the name applies, lists every N seconds the messages entered in the YouTube live chat console and runs it through a couple of IF statements.

  • Does the entered message start with !snapshot
    • Get the date
    • Take a snapshot of the camera
    • Store it to a file
    • Set a few values for the picture (Title/Description/Album etc)
    • Upload snapshot to Imgur and get the response
    • Extract the link from the Imgur JSON response
    • Post a message to Youtube livechat with the authors name and link to Imgur
  • Is the entered message a valid command and is there no user active
    • Set active user name to message author
    • Start the user countdown thread
    • Post a message to Youtube livechat saying user is active and has control of the camera
    • Send command to camera
    • Set active user to True
  • Is the entered message a valid command and is the message author the active user and is there a user active
    • Is it a first message from message author
      • Send command to camera
      • Set first message to False
    • Everything else will be accepted as valid and send to camera (ptz_control function)
  • Is the entered message a valid command and is the message author not the active user and is there a user active
    • Print ‘Invalid user’ to console
  • Did the entered message start with an ‘!’ but is not in valid commands
    • Print ‘Invalid command’ to console
  • Everything else, so just a chat message
    • Print ‘Just a chatmessage’ to console
def respond(msgs, chatid):
    global first_message
    global active_user_name
    global user_active

    for msg in msgs:
        print(msg)
        print(user_active)
        print(active_user_name)

        if msg.message_text.strip() in ['!snapshot', '!Snapshot']:
            date = datetime.now()
            now_us = date.strftime("%B/%-d/%Y")
            camera.snapshot(channel=1, path_file='images/snapshot.jpg')
            file = path.realpath('images/snapshot.jpg')
            title = 'Drierivierenpunt - Dordrecht'
            description = 'This picture has been taken via https://youtu.be/watch?v=rRy5G59ILpU' \
                          + ' on ' + now_us + ' by ' + msg.author.display_name + '.'
            album = '8C4Jp2A'

            response = imgur_client.image_upload(file, title, description, album)

            dump = json.dumps(response)
            json_loads = json.loads(dump)
            link = json_loads["response"]["data"]["link"]
            print(link)
            chat_obj.send_message('Snapshot taken by ' + msg.author.display_name + '.' + 'Link: ' +
                                  link, chatid)
            print("Snapshot taken")
        elif msg.message_text.strip() in valid_commands and not user_active:
            print(msg.author.display_name)
            active_user_name = msg.author.display_name
            print('Countdown user start')
            user_thread = threading.Thread(target=start_countdown_user)
            user_thread.start()
            ptz_control(msg)
            chat_obj.send_message(active_user_name + ' has control of the camera for ' + str(round(timer_user / 60))
                                  + ' minutes.', chatid)
            user_active = True
        elif msg.message_text.strip() in valid_commands and msg.author.display_name == active_user_name and user_active:
            if first_message:
                ptz_control(msg)
                first_message = False
            else:
                ptz_control(msg)
        elif msg.message_text.strip() in valid_commands and msg.author.display_name != active_user_name and user_active:
            print('Invalid user')
        elif msg.message_text.strip().startswith("!") and msg.message_text.strip() not in valid_commands:
            print('Invalid command')
        else:
            print('Just a chatmessage')

So let’s say a command is valid and the message author is valid and a user is active it will trigger the ptz_control function and pass to it the valid command message which will in turn move the camera

Imgur

To upload the snapshot from the camera to Imgur we need to create the Imgur object. For that to work you need to register your application with Imgur. Head over to Imgur API and create an application without a callback URL and write down your client_id and client_secret. Next we need to execute a small separate Python script to retrieve some information we need in order to create our Imgur object in the main program.

import webbrowser
from imgur_python import Imgur

imgur_client = Imgur({'client_id': 'cf8c57ca8......'})
auth_url = imgur_client.authorize()
webbrowser.open(auth_url)

Replace client_id with your own client_id and execute the script. This will launch a webbrowser which will ask you to authorize the application and will respond with a bunch of values in the address bar. Copy those and paste them into a text editor. We will need them later.

So now we’ve got all the required credentials, we can go ahead and create the Imgur object in the main program. Replace all the values with the ones obtained previously.

imgur_client = Imgur({
    "client_id": "cf8c57ca8......",
    "client_secret": "074f7d98ab3c................",
    "access_token": "9ffc44f8b5547......................",
    "expires_in": "696960000",
    "token_type": "bearer",
    "refresh_token": "2b7f29021b...................",
    "account_username": "SomeImgurUserName",
    "account_id": 146....
})

When successful, we should be able to instantiate the object and get a response from Imgur. Try that by executing this small script.

from imgur_python import Imgur

imgur_client = Imgur({
    "client_id": "cf8c57ca8......",
    "client_secret": "074f7d98ab3c................",
    "access_token": "9ffc44f8b5547......................",
    "expires_in": "696960000",
    "token_type": "bearer",
    "refresh_token": "2b7f29021b...................",
    "account_username": "SomeImgurUserName",
    "account_id": 146....
})

page = 0
images = imgur_client.images(page)
print(images)

You should see a response in JSON displaying all of your Imgur images.

Timers

I’m going to use timers to set a certain amount of time the user can control the camera. After that timer finishes another timer kicks in which will prevent back to back control of the camera so that it can have some sort of ‘quiet period’ before the next user takes control. The camera itself is configured in such a way that after three minutes of inactivity it will fall back to it’s default mode. Which is (in my case) scan. Moving slowly from left to right and back again.

Countdown Timer

Like i mentioned i have two countdown timers. The first one sets the maximum time a user can control the camera. (5 minutes)

def start_countdown_user():

    global user_active
    global active_user_name
    global timer_user
    global timer_interval

    sleep(timer_user)
    print('Countdown user finished')
    chat_obj.send_message(active_user_name + ' does not have control over the camera any more. '
            'No control possible for ' + str(round(timer_interval / 60)) + ' minutes.', livechat_id)
    sleep(10)
    print('chat_obj.pauze()')
    chat_obj.pauze()
    print('Going to sleep for ' + str(round(timer_interval / 60)) + ' minutes')
    sleep(timer_interval)
    print('Setting timer back to ' + str(round(300 / 60)) + ' minutes')
    timer_user = 300
    print('Setting user to inactive')
    user_active = False
    print('Setting username to null')
    active_user_name = ''
    print('Setting timer back to ' + str(round(900 / 60)) + ' minutes')
    timer_interval = 900
    print('Going back to start()')
    start()

In the code above we can see two things happening. Timer_user is set to 300 seconds. So after 5 minutes the rest of the code is executed. Which will post a message saying that the user no longer has control and control is not available for the period of the interval timer. Then we sleep for 10 seconds to give the chat object time to post the message. Then we pauze the ytchat thread so that it won’t check for messages (which will save precious credits from Google) followed by a sleep period of the timer_interval (900). When the timer finishes some variable values are restored and the timers are reset. Finally we go back to the start function.

Period Timer

The period timers are somewhat different. Since i only have 10.000 credit from Google per day i needed to have a certain period per day in which i could use the script and stay within these credit limits. Monitoring the actual credit usage of about 30 credits per minute (excluding the message posts, which ‘cost’ more) figured that about a 4 hours time window would be the maximum. So started this ‘valid period’ between 17:00 and 21:00 CEST. The script itself is launched by a cronjob at 17:00, so i needed a timer that would stop the chat object and kill the program. I’ve created this function to return True if the input time is greater that 21:00

def after_period(timenow):

    if datetime.strptime(timenow, '%H:%M') >= datetime.strptime('21:00', '%H:%M'):
        return True
    else:
        return False

Then i created a break loop to check every 60 seconds if we’re past the time threshold. If so sent out a Youtube livechat message saying that the chatbot is inactive and will be available tomorrow at 17:00. Then sleep for 15 seconds in order to give the chat object time to sent out the message and finally stop the chat object which will kill the thread and therefor the whole program.

def break_loop():
    timenow = datetime.now().strftime('%H:%M')

    while not after_period(timenow):
        timenow = datetime.now().strftime('%H:%M')
        sleep(60)
    else:
        chat_obj.send_message('Chatbot inactive. Camera control is possible again tomorrow at 17:00 CEST / 15:00 UTC', livechat_id)
        sleep(15)
        chat_obj.stop()

Start

This function is the go-to point for the main try and is launched at the end of the countdown timer. It will check the current time and whether or not the chat object is running. If it’s not running set running to True and join the chat object thread. If it’s running start the chat object and sent out a message to Youtube livechat console saying that the camera is controllable again (after the countdown timer). Final statement checks whether we’re outside the valid period and kills the chat object and thus the program itself.

def start():
    timenow = datetime.now().strftime('%H:%M')
    global running
    try:
        if not running:
            running = True
            chat_obj.join()
        elif running:
            chat_obj.start()
            sleep(2)
            chat_obj.send_message('The camera is controllable again', livechat_id)
        elif after_period(timenow):
            chat_obj.stop()
    except:
        print('Start Try Error')

Try/Finally

This function will set the whole thing in motion and sends out a live chat message that the chatbot is active and the camera can be controlled followed by launching the break_loop thread and the start function.

try:
    chat_obj.start()
    chat_obj.subscribe_chat_message(respond)
    chat_obj.send_message('Chatbot active. Camera control possible until 21:00 CEST / 19:00 UTC. Click on "Show more" in the  \
                            video description how you can control the camera', livechat_id)
    break_thread = threading.Thread(target=break_loop)
    break_thread.start()
    start()
except:
    print('Main Try Error')

Crontab

Launch the script every day at 17:00 using crontab with python3 from within the virtual environment.

crontab -u <username> -e

Add insert this line

0 17 * * * /path_to_virtual_env/bin/python3 /path_to_virtual_env/chatbot.py > /path_to_virtual_env/output.log 2>&1

Conclusion

I’m fairly new to Python and programming as a whole. But i thought it would be fun to make something like this since i haven’t seen it on YouTube thus far. Most likely going to make changes in the future and request more credits from Google in order to have this script running 24/7.

If you want to check out the Chatbot on my live stream check out the YouTube link below

PayPal

If you like my work, please consider donating