Python区块链技术:10个区块链应用的开发案例

文摘   2024-11-25 19:02   江苏  

区块链技术近年来发展迅速,不仅在金融领域有广泛应用,在其他行业也逐渐崭露头角。今天,我们就来探讨10个使用Python开发的区块链应用案例,帮助大家更好地理解和应用这项技术。

1. 简单的区块链实现

首先,我们从一个简单的区块链实现开始。这个例子将帮助你理解区块链的基本结构和工作原理。

import hashlib
import time

class Block:
    def __init__(self, index, previous_hash, timestamp, data, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.data = data
        self.hash = hash

def calculate_hash(index, previous_hash, timestamp, data):
    value = str(index) + str(previous_hash) + str(timestamp) + str(data)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_block():
    return Block(0"0", int(time.time()), "Genesis Block", calculate_hash(0"0", int(time.time()), "Genesis Block"))

def create_new_block(previous_block, data):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_hash(index, previous_block.hash, timestamp, data)
    return Block(index, previous_block.hash, timestamp, data, hash)

# 创建创世块
blockchain = [create_genesis_block()]
previous_block = blockchain[0]

# 添加新的区块
num_of_blocks_to_add = 5
for i in range(num_of_blocks_to_add):
    new_block_data = f"Block {i+1} data"
    new_block = create_new_block(previous_block, new_block_data)
    blockchain.append(new_block)
    previous_block = new_block
    print(f"Block #{new_block.index} has been added to the blockchain!")
    print(f"Hash: {new_block.hash}\n")

2. 基于Flask的简单区块链API

接下来,我们使用Flask框架创建一个简单的区块链API,以便可以通过HTTP请求与区块链交互。

from flask import Flask, request, jsonify
import hashlib
import time

app = Flask(__name__)

class Block:
    def __init__(self, index, previous_hash, timestamp, data, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.data = data
        self.hash = hash

def calculate_hash(index, previous_hash, timestamp, data):
    value = str(index) + str(previous_hash) + str(timestamp) + str(data)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_block():
    return Block(0"0", int(time.time()), "Genesis Block", calculate_hash(0"0", int(time.time()), "Genesis Block"))

def create_new_block(previous_block, data):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_hash(index, previous_block.hash, timestamp, data)
    return Block(index, previous_block.hash, timestamp, data, hash)

blockchain = [create_genesis_block()]
previous_block = blockchain[0]

@app.route('/mine', methods=['POST'])
def mine():
    data = request.json['data']
    new_block = create_new_block(previous_block, data)
    blockchain.append(new_block)
    global previous_block
    previous_block = new_block
    response = {
        'index': new_block.index,
        'previous_hash': new_block.previous_hash,
        'timestamp': new_block.timestamp,
        'data': new_block.data,
        'hash': new_block.hash
    }
    return jsonify(response), 200

@app.route('/chain', methods=['GET'])
def full_chain():
    response = {
        'chain': [block.__dict__ for block in blockchain],
        'length': len(blockchain),
    }
    return jsonify(response), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

3. 分布式区块链网络

为了实现分布式区块链网络,我们可以使用Peer-to-Peer (P2P)技术。这里我们使用socket库来实现一个简单的P2P网络。

import socket
import threading
import json

# 定义节点类
class Node:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.bind((host, port))
        self.server.listen(5)
        self.nodes = []
        self.blockchain = []

    def start(self):
        print(f"Node started on {self.host}:{self.port}")
        while True:
            client_socket, client_address = self.server.accept()
            self.nodes.append(client_socket)
            threading.Thread(target=self.handle_client, args=(client_socket,)).start()

    def handle_client(self, client_socket):
        while True:
            try:
                message = client_socket.recv(1024).decode('utf-8')
                if message:
                    print(f"Received: {message}")
                    self.broadcast(message)
            except:
                client_socket.close()
                self.nodes.remove(client_socket)
                break

    def broadcast(self, message):
        for node in self.nodes:
            try:
                node.sendall(message.encode('utf-8'))
            except:
                continue

    def send_message(self, message):
        for node in self.nodes:
            try:
                node.sendall(message.encode('utf-8'))
            except:
                continue

if __name__ == '__main__':
    node = Node('127.0.0.1.txt'5000)
    threading.Thread(target=node.start).start()
    # 模拟发送消息
    time.sleep(2)
    node.send_message(json.dumps({'type''block''data''Block 1.txt'}))

4. 区块链投票系统

区块链可以用于构建透明且不可篡改的投票系统。以下是一个简单的投票系统的实现。

class VoteBlock:
    def __init__(self, index, previous_hash, timestamp, vote, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.vote = vote
        self.hash = hash

def calculate_vote_hash(index, previous_hash, timestamp, vote):
    value = str(index) + str(previous_hash) + str(timestamp) + str(vote)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_vote_block():
    return VoteBlock(0"0", int(time.time()), "Genesis Vote", calculate_vote_hash(0"0", int(time.time()), "Genesis Vote"))

def create_new_vote_block(previous_block, vote):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_vote_hash(index, previous_block.hash, timestamp, vote)
    return VoteBlock(index, previous_block.hash, timestamp, vote, hash)

vote_blockchain = [create_genesis_vote_block()]
previous_vote_block = vote_blockchain[0]

def add_vote(vote):
    new_vote_block = create_new_vote_block(previous_vote_block, vote)
    vote_blockchain.append(new_vote_block)
    global previous_vote_block
    previous_vote_block = new_vote_block
    print(f"Vote Block #{new_vote_block.index} has been added to the blockchain!")
    print(f"Hash: {new_vote_block.hash}\n")

# 测试投票
add_vote("Candidate A")
add_vote("Candidate B")
add_vote("Candidate A")

5. 区块链供应链管理

区块链可以用于提高供应链的透明度和安全性。以下是一个简单的供应链管理系统的实现。

class SupplyChainBlock:
    def __init__(self, index, previous_hash, timestamp, product_info, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.product_info = product_info
        self.hash = hash

def calculate_supply_chain_hash(index, previous_hash, timestamp, product_info):
    value = str(index) + str(previous_hash) + str(timestamp) + str(product_info)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_supply_chain_block():
    return SupplyChainBlock(0"0", int(time.time()), "Genesis Product", calculate_supply_chain_hash(0"0", int(time.time()), "Genesis Product"))

def create_new_supply_chain_block(previous_block, product_info):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_supply_chain_hash(index, previous_block.hash, timestamp, product_info)
    return SupplyChainBlock(index, previous_block.hash, timestamp, product_info, hash)

supply_chain_blockchain = [create_genesis_supply_chain_block()]
previous_supply_chain_block = supply_chain_blockchain[0]

def add_product(product_info):
    new_supply_chain_block = create_new_supply_chain_block(previous_supply_chain_block, product_info)
    supply_chain_blockchain.append(new_supply_chain_block)
    global previous_supply_chain_block
    previous_supply_chain_block = new_supply_chain_block
    print(f"Supply Chain Block #{new_supply_chain_block.index} has been added to the blockchain!")
    print(f"Hash: {new_supply_chain_block.hash}\n")

# 测试添加产品
add_product({"product_id"1"name""Apple""origin""China"})
add_product({"product_id"2"name""Banana""origin""India"})

6. 区块链身份验证

区块链可以用于实现安全的身份验证系统。以下是一个简单的身份验证系统的实现。

class IdentityBlock:
    def __init__(self, index, previous_hash, timestamp, identity_info, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.identity_info = identity_info
        self.hash = hash

def calculate_identity_hash(index, previous_hash, timestamp, identity_info):
    value = str(index) + str(previous_hash) + str(timestamp) + str(identity_info)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_identity_block():
    return IdentityBlock(0"0", int(time.time()), "Genesis Identity", calculate_identity_hash(0"0", int(time.time()), "Genesis Identity"))

def create_new_identity_block(previous_block, identity_info):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_identity_hash(index, previous_block.hash, timestamp, identity_info)
    return IdentityBlock(index, previous_block.hash, timestamp, identity_info, hash)

identity_blockchain = [create_genesis_identity_block()]
previous_identity_block = identity_blockchain[0]

def add_identity(identity_info):
    new_identity_block = create_new_identity_block(previous_identity_block, identity_info)
    identity_blockchain.append(new_identity_block)
    global previous_identity_block
    previous_identity_block = new_identity_block
    print(f"Identity Block #{new_identity_block.index} has been added to the blockchain!")
    print(f"Hash: {new_identity_block.hash}\n")

# 测试添加身份信息
add_identity({"user_id"1"name""Alice""email""alice@example.com"})
add_identity({"user_id"2"name""Bob""email""bob@example.com"})

7. 区块链数字版权管理

区块链可以用于保护数字版权,确保创作者的权益。以下是一个简单的数字版权管理系统的实现。

class DigitalRightBlock:
    def __init__(self, index, previous_hash, timestamp, right_info, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.right_info = right_info
        self.hash = hash

def calculate_digital_right_hash(index, previous_hash, timestamp, right_info):
    value = str(index) + str(previous_hash) + str(timestamp) + str(right_info)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_digital_right_block():
    return DigitalRightBlock(0"0", int(time.time()), "Genesis Right", calculate_digital_right_hash(0"0", int(time.time()), "Genesis Right"))

def create_new_digital_right_block(previous_block, right_info):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_digital_right_hash(index, previous_block.hash, timestamp, right_info)
    return DigitalRightBlock(index, previous_block.hash, timestamp, right_info, hash)

digital_right_blockchain = [create_genesis_digital_right_block()]
previous_digital_right_block = digital_right_blockchain[0]

def add_right(right_info):
    new_digital_right_block = create_new_digital_right_block(previous_digital_right_block, right_info)
    digital_right_blockchain.append(new_digital_right_block)
    global previous_digital_right_block
    previous_digital_right_block = new_digital_right_block
    print(f"Digital Right Block #{new_digital_right_block.index} has been added to the blockchain!")
    print(f"Hash: {new_digital_right_block.hash}\n")

# 测试添加数字版权信息
add_right({"content_id"1"creator""Alice""rights""All rights reserved"})
add_right({"content_id"2"creator""Bob""rights""Creative Commons"})

8. 区块链医疗记录管理

区块链可以用于存储和管理医疗记录,确保数据的安全性和隐私性。以下是一个简单的医疗记录管理系统的实现。

class MedicalRecordBlock:
    def __init__(self, index, previous_hash, timestamp, record_info, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.record_info = record_info
        self.hash = hash

def calculate_medical_record_hash(index, previous_hash, timestamp, record_info):
    value = str(index) + str(previous_hash) + str(timestamp) + str(record_info)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_medical_record_block():
    return MedicalRecordBlock(0"0", int(time.time()), "Genesis Record", calculate_medical_record_hash(0"0", int(time.time()), "Genesis Record"))

def create_new_medical_record_block(previous_block, record_info):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_medical_record_hash(index, previous_block.hash, timestamp, record_info)
    return MedicalRecordBlock(index, previous_block.hash, timestamp, record_info, hash)

medical_record_blockchain = [create_genesis_medical_record_block()]
previous_medical_record_block = medical_record_blockchain[0]

def add_record(record_info):
    new_medical_record_block = create_new_medical_record_block(previous_medical_record_block, record_info)
    medical_record_blockchain.append(new_medical_record_block)
    global previous_medical_record_block
    previous_medical_record_block = new_medical_record_block
    print(f"Medical Record Block #{new_medical_record_block.index} has been added to the blockchain!")
    print(f"Hash: {new_medical_record_block.hash}\n")

# 测试添加医疗记录
add_record({"patient_id"1"doctor""Dr. Smith""diagnosis""Flu"})
add_record({"patient_id"2"doctor""Dr. Johnson""diagnosis""Cold"})

9. 区块链房地产交易

区块链可以用于记录和验证房地产交易,确保交易的透明性和安全性。以下是一个简单的房地产交易系统的实现。

class RealEstateBlock:
    def __init__(self, index, previous_hash, timestamp, transaction_info, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.transaction_info = transaction_info
        self.hash = hash

def calculate_real_estate_hash(index, previous_hash, timestamp, transaction_info):
    value = str(index) + str(previous_hash) + str(timestamp) + str(transaction_info)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_real_estate_block():
    return RealEstateBlock(0"0", int(time.time()), "Genesis Transaction", calculate_real_estate_hash(0"0", int(time.time()), "Genesis Transaction"))

def create_new_real_estate_block(previous_block, transaction_info):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_real_estate_hash(index, previous_block.hash, timestamp, transaction_info)
    return RealEstateBlock(index, previous_block.hash, timestamp, transaction_info, hash)

real_estate_blockchain = [create_genesis_real_estate_block()]
previous_real_estate_block = real_estate_blockchain[0]

def add_transaction(transaction_info):
    new_real_estate_block = create_new_real_estate_block(previous_real_estate_block, transaction_info)
    real_estate_blockchain.append(new_real_estate_block)
    global previous_real_estate_block
    previous_real_estate_block = new_real_estate_block
    print(f"Real Estate Block #{new_real_estate_block.index} has been added to the blockchain!")
    print(f"Hash: {new_real_estate_block.hash}\n")

# 测试添加房地产交易
add_transaction({"property_id"1"seller""Alice""buyer""Bob""price"500000})
add_transaction({"property_id"2"seller""Charlie""buyer""David""price"750000})

10. 区块链智能合约

智能合约是区块链的一个重要应用,可以自动执行合同条款。以下是一个简单的智能合约的实现。

class SmartContractBlock:
    def __init__(self, index, previous_hash, timestamp, contract_info, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.contract_info = contract_info
        self.hash = hash

def calculate_smart_contract_hash(index, previous_hash, timestamp, contract_info):
    value = str(index) + str(previous_hash) + str(timestamp) + str(contract_info)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_smart_contract_block():
    return SmartContractBlock(0"0", int(time.time()), "Genesis Contract", calculate_smart_contract_hash(0"0", int(time.time()), "Genesis Contract"))

def create_new_smart_contract_block(previous_block, contract_info):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_smart_contract_hash(index, previous_block.hash, timestamp, contract_info)
    return SmartContractBlock(index, previous_block.hash, timestamp, contract_info, hash)

smart_contract_blockchain = [create_genesis_smart_contract_block()]
previous_smart_contract_block = smart_contract_blockchain[0]

def add_contract(contract_info):
    new_smart_contract_block = create_new_smart_contract_block(previous_smart_contract_block, contract_info)
    smart_contract_blockchain.append(new_smart_contract_block)
    global previous_smart_contract_block
    previous_smart_contract_block = new_smart_contract_block
    print(f"Smart Contract Block #{new_smart_contract_block.index} has been added to the blockchain!")
    print(f"Hash: {new_smart_contract_block.hash}\n")

# 测试添加智能合约
add_contract({"contract_id"1"parties": ["Alice""Bob"], "terms""Alice will pay Bob $1000 if the weather is sunny on 2023-10-01"})
add_contract({"contract_id"2"parties": ["Charlie""David"], "terms""Charlie will deliver a package to David by 2023-10-15"})

实战案例:基于区块链的投票系统

假设我们要为一个社区活动开发一个投票系统,确保投票过程的透明性和不可篡改性。我们将使用前面提到的投票系统代码,并增加一些额外的功能,如用户注册和投票验证。

import hashlib
import time
import json

class VoteBlock:
    def __init__(self, index, previous_hash, timestamp, vote, hash):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.vote = vote
        self.hash = hash

def calculate_vote_hash(index, previous_hash, timestamp, vote):
    value = str(index) + str(previous_hash) + str(timestamp) + str(vote)
    return hashlib.sha256(value.encode('utf-8')).hexdigest()

def create_genesis_vote_block():
    return VoteBlock(0"0", int(time.time()), "Genesis Vote", calculate_vote_hash(0"0", int(time.time()), "Genesis Vote"))

def create_new_vote_block(previous_block, vote):
    index = previous_block.index + 1
    timestamp = int(time.time())
    hash = calculate_vote_hash(index, previous_block.hash, timestamp, vote)
    return VoteBlock(index, previous_block.hash, timestamp, vote, hash)

vote_blockchain = [create_genesis_vote_block()]
previous_vote_block = vote_blockchain[0]

users = {}

def register_user(user_id, name):
    if user_id not in users:
        users[user_id] = {'name': name, 'voted'False}
        print(f"User {name} registered with ID: {user_id}")
    else:
        print(f"User with ID {user_id} already exists.")

def add_vote(user_id, candidate):
    if user_id in users and not users[user_id]['voted']:
        new_vote_block = create_new_vote_block(previous_vote_block, {'user_id': user_id, 'candidate': candidate})
        vote_blockchain.append(new_vote_block)
        global previous_vote_block
        previous_vote_block = new_vote_block
        users[user_id]['voted'] = True
        print(f"Vote Block #{new_vote_block.index} has been added to the blockchain!")
        print(f"Hash: {new_vote_block.hash}\n")
    else:
        print(f"User with ID {user_id} has already voted or does not exist.")

# 注册用户
register_user(1"Alice")
register_user(2"Bob")
register_user(3"Charlie")

# 用户投票
add_vote(1"Candidate A")
add_vote(2"Candidate B")
add_vote(3"Candidate A")

# 查看区块链
for block in vote_blockchain:
    print(f"Index: {block.index}, Hash: {block.hash}, Vote: {block.vote}")

总结

本文介绍了10个使用Python开发的区块链应用案例,包括简单的区块链实现、基于Flask的区块链API、分布式区块链网络、区块链投票系统、区块链供应链管理、区块链身份验证、区块链数字版权管理、区块链医疗记录管理、区块链房地产交易和区块链智能合约。每个案例都提供了详细的代码示例和解释,帮助你更好地理解和应用区块链技术。

好了,今天的分享就到这里了,我们下期见。如果本文对你有帮助,请动动你可爱的小手指点赞、转发、在看吧!

付费合集推荐

Python编程基础

Python办公自动化-Excel

微信公众号批量上传发布系统

文末福利

公众号消息窗口回复“编程资料”,获取Python编程、人工智能、爬虫等100+本精品电子书。

推广服务

公众号推广代运营代发服务

关注我👇,精彩不再错过


手把手PythonAI编程
分享与人工智能和python编程语言相关的笔记和项目经历。
 最新文章