Analyzing videos

The first thing we need to do is to get the right authentication token. We usually get that token from one of EyeQuant's customer success managers or technical staff. Here we will create a class to manage it.

class BearerTokenAuth(requests.auth.AuthBase):
    def __init__(self, access_token):
        self.access_token = access_token

    def __call__(self, request):
        request.headers["Authorization"] = "Bearer {}".format(self.access_token)
        return request

Now, to implement our user session: the Requests Session object allows you to persist specific parameters across requests to the same site. To get the Session object in Python Requests, you need to call the requests.Session() method.

api = requests.Session()

Let's add the authentication token to our Session instance.

api.auth = BearerTokenAuth(access_token)

Next step is to request an URL that you will use to upload your file. It is a presigned URL to a data storage for your account, that we have already created for you when we set up your API access.

upload_url_response = api.get(
    base_url + "/upload-urls",
    data=json.dumps({}),
    headers={"Content-Type": "application/json"},
)
upload_url = upload_url_response.json()["url"]

The next code will help you to upload the file to the storage associated to your account. It does a PUTrequest containing the video data to the location in your account's storage.

with open(
    os.path.join(os.path.dirname(__file__), "VIDEO.webm"), "rb"
) as video_file:
    response = requests.put(upload_url, data=video_file.read())
    response.raise_for_status()

The video format must be WebM file. It has to be smaller than 100 MB and it may also not be longer than 60 seconds.

Since the file is already in your bucket, you are ready to create an analysis. We do that by sending a POST request containing all needed specification about the planned analysis to the API.

analysis_configuration = {
    "input": {
        "content": upload_url,
        "title": "Video Testing",
    }
}

create_response = api.post(
    base_url + "/analyses/video",
    data=json.dumps(analysis_configuration),
    headers={"Content-Type": "application/json"},
)

create_response.raise_for_status()

analysis_url = create_response.json()["location"]

You can now wait at least a few minutes depending on the size of your video and the request the status of the analysis. You can poll the API for the status of the analysis in a 30 seconds rhythm.

analysis = {"status": "pending"}
while analysis["status"] == "pending":
    time.sleep(30)
    analysis_response = api.get(analysis_url)
    analysis_response.raise_for_status()
    analysis = analysis_response.json()

Now let's check the status returned by the API and if the analysis was successful , then either simply exit or do something with the results like persisting them.

if analysis["status"] != "success":
    print("Video analysis succeeded.")
    sys.exit()

The whole code looks like this:

import sys
import os

import requests
import json


access_token = "YOUR-API-ACCESS-TOKEN"


class BearerTokenAuth(requests.auth.AuthBase):
    def __init__(self, access_token):
        self.access_token = access_token

    def __call__(self, request):
        request.headers["Authorization"] = "Bearer {}".format(self.access_token)
        return request


api = requests.Session()
base_url = "https://api.eyequant.com/v2"

api.auth = BearerTokenAuth(access_token)

upload_url_response = api.get(
    base_url + "/upload-urls",
    data=json.dumps({}),
    headers={"Content-Type": "application/json"},
)
upload_url = upload_url_response.json()["url"]

with open(os.path.join(os.path.dirname(__file__), "VIDEO.webm"), "rb") as video_file:
    response = requests.put(upload_url, data=video_file.read())
    response.raise_for_status()

analysis_configuration = {
    "input": {
        "content": upload_url,
        "title": "Video Testing",
    }
}

create_response = api.post(
    base_url + "/analyses/video",
    data=json.dumps(analysis_configuration),
    headers={"Content-Type": "application/json"},
)

create_response.raise_for_status()

analysis_url = create_response.json()["location"]

analysis = {"status": "pending"}
while analysis["status"] == "pending":
    time.sleep(30)
    analysis_response = api.get(analysis_url)
    analysis_response.raise_for_status()
    analysis = analysis_response.json()

if analysis["status"] != "success":
    print("Video analysis succeeded.")
    sys.exit()

And that's all.